Предупреждение сельдерея «Получено и удалено неизвестное сообщение»

Я настраиваю задачу в Celery, чтобы она «потребляла» из определенного обмена темами. Когда я отправляю сообщение на соответствующий обмен, я получаю сообщение об ошибке: «Получено и удалено неизвестное сообщение. Неверный пункт назначения?!?» на консоли сельдерея.

Я сделал отдельную папку проекта, чтобы воспроизвести проблему, где все называется test-something со следующей структурой:

celery-test/  
  L celery.py  
  L celeryconfig.py  
  L tasks.py

Я видел различные вопросы StackOverflow и проблемы GitHub, касающиеся пакета librabbitmq. Решение здесь состояло бы в том, чтобы удалить этот пакет, но он даже не установлен, так что это ни к чему меня не привело. Некоторые из найденных вопросов/проблем, которые предлагают это решение:
- https://github.com/сельдерей/сельдерей/issues/3675
- Celery &Rabbitmq:WARNING/MainProcess] Получено и удалено неизвестное сообщение. Не тот пункт назначения?!? - эксперимент на ЖКТ

Я также пробовал играть с настройками маршрутизации задач, так как я предполагаю, что проблема заключается в банкомате, но я не могу заставить его работать.

Для тех, кто интересуется, почему порт выключен на 1, это потому, что он указывает на rabbitmq в моем контейнере докеров, который больше не может использовать 5672.

celery.py

app = Celery('celery_test', include=['celery_test.tasks'])
app.config_from_object('celery_test.celeryconfig')

celeryconfig.py

broker_url = 'amqp://guest:guest@localhost:5673//'
result_backend = 'rpc://'

default_exchange = Exchange('default', type='direct')
test_exchange = Exchange('test_exchange', type='topic')
task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('test_queue', test_exchange, routing_key='test123test')
)

task_routes = {
    'celery_test.tasks.test_method': {
        'queue': 'test_queue'
    }
}

задачи.py

@app.task
def test_method():
    print('test_method')
    return 'test_method'

И затем файл, который я использовал для отправки сообщения: send.py

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5673/'))
channel = connection.channel()

exchange = 'test_exchange'
routing_key = 'test123test'
message = 'Testmessage'

channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)

channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
connection.close()
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
557
2

Ответы 2

На самом деле это может быть не ответ, а скорее продолжение. Но я подумал, что дам знать людям, которые столкнулись с этой проблемой. (Этот пост — моя интерпретация, и, поскольку я новичок в сельдерее, вам, вероятно, следует отнестись к этому с недоверием.)

Так что в основном причина, по которой я думаю, что это происходит, заключается в том, что Сельдерей не понимает сообщения. Celery требует много заголовков и других свойств, прежде чем он сможет понять, что пытается сделать сообщение.

Эти заголовки можно эмулировать путем обратного проектирования, но я не буду вдаваться в подробности, поскольку есть более простые способы решения приложения, которое я планирую создать.

Если есть кто-то, кто читает это с большим опытом в этом вопросе, не стесняйтесь меня поправлять.

Это для всех, кого это затронет в 2021 году. У меня есть старая служба, использующая celery 3.1.x (назовем ее «устаревшей»), и недавно созданная служба, использующая версию celery 5.0.x (назовем ее «современной»).

В современной кодовой базе метод Celery.signature() используется для создания подписи, затем apply_async() вызывается для вызова задачи в устаревшей версии. Устаревший сельдерей получил мое сообщение, но отбросил его с этой ошибкой:

Received and deleted unknown message. Wrong destination?!?

Я решил проблему, добавив эту строку в файл celeryconfig.py:

task_protocol = 1

Объяснение здесь: https://github.com/celery/celery/issues/3675#issuecomment-294129297

Другие вопросы по теме