Я настраиваю задачу в Celery, чтобы она «потребляла» из определенного обмена темами. Когда я отправляю сообщение на соответствующий обмен, я получаю сообщение об ошибке: «Получено и удалено неизвестное сообщение. Неверный пункт назначения?!?» на консоли сельдерея.
Я сделал отдельную папку проекта, чтобы воспроизвести проблему, где все называется test-something со следующей структурой:
celery-test/
L celery.py
L celeryconfig.py
L tasks.py
Я видел различные вопросы StackOverflow и проблемы GitHub, касающиеся пакета librabbitmq. Решение здесь состояло бы в том, чтобы удалить этот пакет, но он даже не установлен, так что это ни к чему меня не привело. Некоторые из найденных вопросов/проблем, которые предлагают это решение:
- https://github.com/сельдерей/сельдерей/issues/3675
- Celery &Rabbitmq:WARNING/MainProcess] Получено и удалено неизвестное сообщение. Не тот пункт назначения?!? - эксперимент на ЖКТ
Я также пробовал играть с настройками маршрутизации задач, так как я предполагаю, что проблема заключается в банкомате, но я не могу заставить его работать.
Для тех, кто интересуется, почему порт выключен на 1, это потому, что он указывает на rabbitmq в моем контейнере докеров, который больше не может использовать 5672.
celery.py
app = Celery('celery_test', include=['celery_test.tasks'])
app.config_from_object('celery_test.celeryconfig')
celeryconfig.py
broker_url = 'amqp://guest:guest@localhost:5673//'
result_backend = 'rpc://'
default_exchange = Exchange('default', type='direct')
test_exchange = Exchange('test_exchange', type='topic')
task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('test_queue', test_exchange, routing_key='test123test')
)
task_routes = {
'celery_test.tasks.test_method': {
'queue': 'test_queue'
}
}
задачи.py
@app.task
def test_method():
print('test_method')
return 'test_method'
И затем файл, который я использовал для отправки сообщения: send.py
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5673/'))
channel = connection.channel()
exchange = 'test_exchange'
routing_key = 'test123test'
message = 'Testmessage'
channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
connection.close()






На самом деле это может быть не ответ, а скорее продолжение. Но я подумал, что дам знать людям, которые столкнулись с этой проблемой. (Этот пост — моя интерпретация, и, поскольку я новичок в сельдерее, вам, вероятно, следует отнестись к этому с недоверием.)
Так что в основном причина, по которой я думаю, что это происходит, заключается в том, что Сельдерей не понимает сообщения. Celery требует много заголовков и других свойств, прежде чем он сможет понять, что пытается сделать сообщение.
Эти заголовки можно эмулировать путем обратного проектирования, но я не буду вдаваться в подробности, поскольку есть более простые способы решения приложения, которое я планирую создать.
Если есть кто-то, кто читает это с большим опытом в этом вопросе, не стесняйтесь меня поправлять.
Это для всех, кого это затронет в 2021 году. У меня есть старая служба, использующая celery 3.1.x (назовем ее «устаревшей»), и недавно созданная служба, использующая версию celery 5.0.x (назовем ее «современной»).
В современной кодовой базе метод Celery.signature() используется для создания подписи, затем apply_async() вызывается для вызова задачи в устаревшей версии. Устаревший сельдерей получил мое сообщение, но отбросил его с этой ошибкой:
Received and deleted unknown message. Wrong destination?!?
Я решил проблему, добавив эту строку в файл celeryconfig.py:
task_protocol = 1
Объяснение здесь: https://github.com/celery/celery/issues/3675#issuecomment-294129297