Как я могу включить обработку DeadLetter в mqtt-связь RabbitMQ?

У меня есть связь mqtt, и устройство «A» выдает сообщение с темой ex1.ex2.ex3, а устройство «B», подписавшееся на тему ex1.ex2.ex3, по какой-то причине не может использовать это сообщение. Есть ли способ заставить это сообщение, выданное устройством «А», перейти в определенную «очередь» через «определенное время» без использования сообщения? Я знаю, что в коммуникации mqtt нет концепции очередей, но я хочу реализовать истечение срока действия сообщений и обработку недоставленных писем в более ранней версии mqtt V5.

Я пробовал установить TTL для всех очередей и обменов с помощью таких настроек политики.

sudo rabbitmqctl set_policy TTL ".*" '{"message-ttl":3000, "dead-letter-exchange":"DeadLetterTestExchange", "dead-letter-routing-key":"test.deadletter.queue"}' --apply-to queues

, но есть еще трудности. Связь mqtt осуществляется с помощью qos 0, и я хотел бы узнать подробный метод. Даже если я проверю официальный документ Rabbitmq, я не вижу обработки истечения срока действия сообщения или формулировки, связанной с DeadLetter в mqtt.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Использование публикаций и подписок QoS-0 приводит к созданию очередей, которые не могут использовать политики недоставленных сообщений с настройками RabbitMQ по умолчанию. Решив эту проблему, вы рискуете столкнуться со второй проблемой, связанной с циклами сообщений, если вы пытаетесь использовать сообщения с DLQ в том же соединении, которое принимает сообщения без DLQ. Более подробная информация ниже:

Первая проблема: QoS-0

Во-первых, вы подписываетесь на свою тему с MQTT QoS 0. Под капотом RabbitMQ привязывает временную очередь к значению по умолчанию mqtt.exchange (обычно amq.topic) для вашей подписки. Если вы подписываетесь с QoS 0, созданная временная очередь относится к специальному типу, к которому политики не могут применяться. Из документации:

Тип очереди MQTT QoS 0 можно рассматривать как «псевдо» или «виртуальную» очередь: он сильно отличается от других типов очередей (классических очередей, очередей кворума и потоков) в том смысле, что этот новый тип очереди не является ни отдельный процесс Erlang и не сохраняет сообщения на диске. Вместо этого этот тип очереди является подмножеством почтового ящика процесса Erlang. Сообщения MQTT напрямую отправляются в процесс подключения MQTT подписывающегося клиента. Другими словами, сообщения MQTT отправляются любым «онлайн» подписчикам MQTT.

Побочным эффектом этой особенности является то, что политики не применяются к очередям QoS-0.

С другой стороны, политики применяются к очередям QoS-1. Когда вы оформляете подписку MQTT на уровне QoS-1, RabbitMQ создает «обычный» объект очереди (тип которого настраивается: кворум или классический) для подписки и привязывает его к значению по умолчанию mqtt.exchange.

К очередям QoS-1 обычно применяются политики. Вот пример снимка экрана подписчика QoS-0 и подписчика QoS-1 в пользовательском интерфейсе RabbitMQ при наличии политики deadletter, которая применяется к очередям с регулярным выражением .*:

Обратите внимание, что очередь QoS-0 имеет специальный тип rabbit_mqtt_qos0_queue (который также настраивается) и не получает политику deadletter.

Вкратце: вам придется подписаться на тему MQTT с QoS-1 и публиковать сообщения с QoS-1.

В качестве альтернативы вы можете заставить очереди QoS-0 нормально получать политики, изменив флаг функции rabbit_mqtt_qos0_queue RabbitMQ.

Вторая проблема: циклы сообщений при подписке от одного и того же потребителя.

Однако это еще не все, что нужно для решения вашей проблемы. Если вы создадите две подписки QoS-1 в одном и том же соединении MQTT, RabbitMQ реализует их как две привязки (по одной на каждую тему MQTT) к одной и той же временной очереди. Если эта очередь имеет политику недоставленных сообщений, маршрутизирующую ее самой (даже косвенно через обмен недоставленными письмами, который маршрутизируется в связанную очередь через другой ключ маршрутизации), RabbitMQ не доставит ей сообщение. Я подозреваю, но не уверен, что это связано с попыткой RabbitMQ предотвратить бесконечный цикл сообщений.

В результате вам придется выбрать один из двух вариантов:

  1. При использовании DLQ в том же MQTT-соединении, которое подписывается на недоставленные сообщения, подпишитесь на «основную» тему с QoS-1 и тему DLQ с QoS-0.
  2. Запустите отдельный потребитель MQTT темы DLQ (на любом уровне QoS). Я рекомендую этот подход.

Выполнение последнего также позволяет вам создать политику недоставленных сообщений для исключения очереди второго потребителя (если потребитель DLQ находится на уровне QoS-1, так что политики даже могут применяться к его очереди), предотвращая истечение срока действия сообщений, которые уже были недоставлены. если потребитель недоставленных писем не сможет добраться до них вовремя.

Как избежать этих проблем

  1. Создайте политику RabbitMQ, подобную той, что указана в вашем вопросе, со значением dead-letter-exchange, соответствующим вашей конфигурации mqtt.exchange RabbitMQ (по умолчанию — amq.topic). Регулярное выражение может быть .*, если вы потребляете сообщения DLQ из подписки QoS-0.
    • Если вы потребляете сообщения DLQ по подписке QoS-1 (например, вы сохраняете сообщения после закрытия потребителей вниз ), регулярное выражение должен включать темы MQTT, которые могут содержать сообщения с истекшим сроком действия или устаревшими сообщениями, и следует исключить очереди, которые использует потребитель недоставленных сообщений (очередь имена автоматически генерируются RabbitMQ на основе идентификатора клиента MQTT, получается что-то вроде mqtt-subscription-dlqconsumerqos1)
  2. Подпишитесь на непрочитанные сообщения, используя подписку QoS-1.
  3. Подпишитесь на тему недоставленных сообщений либо по QoS-1 в отдельном соединении MQTT, либо по QoS-0 по тому же соединению, которое подписано на тему, не относящуюся к DLQ.
  4. Публикуйте сообщения в теме без мертвых букв с QoS 1.

Пример. Использование двух процессов для обработки недописанных сообщений.

Приведенный ниже Python иллюстрирует, как будет работать потребление сообщений MQTT, обработанных DLQ, с использованием двух отдельных потребительских процессов.

Политика RabbitMQ для истечения срока действия сообщений через 2 секунды:

rabbitmqctl set_policy TTL "mqtt-subscription-dlq_client.*" '{"message-ttl":2000, "dead-letter-exchange":"amq.topic", "dead-letter-routing-key":"dlq"}' --apply-to queues

Основной потребитель сообщений, не относящихся к DLQ, обработка каждого сообщения которого занимает 3 секунды. Этот скрипт также является издателем, хотя публикации могут происходить откуда угодно, если они имеют QoS-1, primary.py:

from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import time

def on_connect(client: Client, _, __, result, *___):
    client.subscribe('non_dlq', qos=1)
    client.publish('non_dlq', 'message1', qos=1)
    client.publish('non_dlq', 'message2', qos=1)
    client.publish('non_dlq', 'message3', qos=1)
    print(f"Connected: {result}")

def on_message(client: Client, _, msg: MQTTMessage):
    print(f"Primary: Got message from {msg.topic}: {msg.payload}")
    # Sleep before acknowledging the message to force other ready messages in the queue to expire and get dead-lettered.
    time.sleep(3)

client = Client(CallbackAPIVersion.VERSION2, client_id = "main_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message

# Connect the main consumer with a maximum of 1 local messages to force dead-lettering to occur correctly: RabbitMQ's
# expiration only applies to messages that have not been claimed by a consumer.
connect_properties = Properties(PacketTypes.CONNECT)
connect_properties.ReceiveMaximum = 1
client.connect("localhost", properties=connect_properties)
client.loop_forever()

Потребитель DLQ, dlq.py:

from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion

def on_connect(client: Client, _, __, result, *___):
    client.subscribe('dlq', qos=1)
    print(f"Connected: {result}")

def on_message(client: Client, _, msg: MQTTMessage):
    print(f"DLQ: Got message from {msg.topic}: {msg.payload}")

client = Client(CallbackAPIVersion.VERSION2, client_id = "dlq_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost")
client.loop_forever()

Примените политику RabbitMQ, сначала запустите dlq.py, затем запустите primary.py. Результат primary.py должен быть:

Connected: Success
Primary: Got message from non_dlq: b'message1'

Это указывает на то, что он получил сообщение 1 из 3 из основной темы, не относящейся к DLQ.

Результат dlq.py должен быть:

Connected: Success
DLQ: Got message from dlq: b'message2'
DLQ: Got message from dlq: b'message3'

Это указывает на то, что он получил второе и третье сообщения из очереди недоставленных сообщений.

Другие вопросы по теме