У меня есть связь mqtt, и устройство «A» выдает сообщение с темой ex1.ex2.ex3, а устройство «B», подписавшееся на тему ex1.ex2.ex3, по какой-то причине не может использовать это сообщение. Есть ли способ заставить это сообщение, выданное устройством «А», перейти в определенную «очередь» через «определенное время» без использования сообщения? Я знаю, что в коммуникации mqtt нет концепции очередей, но я хочу реализовать истечение срока действия сообщений и обработку недоставленных писем в более ранней версии mqtt V5.
Я пробовал установить TTL для всех очередей и обменов с помощью таких настроек политики.
sudo rabbitmqctl set_policy TTL ".*" '{"message-ttl":3000, "dead-letter-exchange":"DeadLetterTestExchange", "dead-letter-routing-key":"test.deadletter.queue"}' --apply-to queues
, но есть еще трудности. Связь mqtt осуществляется с помощью qos 0, и я хотел бы узнать подробный метод. Даже если я проверю официальный документ Rabbitmq, я не вижу обработки истечения срока действия сообщения или формулировки, связанной с DeadLetter в mqtt.
Использование публикаций и подписок QoS-0 приводит к созданию очередей, которые не могут использовать политики недоставленных сообщений с настройками RabbitMQ по умолчанию. Решив эту проблему, вы рискуете столкнуться со второй проблемой, связанной с циклами сообщений, если вы пытаетесь использовать сообщения с DLQ в том же соединении, которое принимает сообщения без DLQ. Более подробная информация ниже:
Во-первых, вы подписываетесь на свою тему с MQTT QoS 0. Под капотом RabbitMQ привязывает временную очередь к значению по умолчанию mqtt.exchange
(обычно amq.topic
) для вашей подписки. Если вы подписываетесь с QoS 0, созданная временная очередь относится к специальному типу, к которому политики не могут применяться. Из документации:
Тип очереди MQTT QoS 0 можно рассматривать как «псевдо» или «виртуальную» очередь: он сильно отличается от других типов очередей (классических очередей, очередей кворума и потоков) в том смысле, что этот новый тип очереди не является ни отдельный процесс Erlang и не сохраняет сообщения на диске. Вместо этого этот тип очереди является подмножеством почтового ящика процесса Erlang. Сообщения MQTT напрямую отправляются в процесс подключения MQTT подписывающегося клиента. Другими словами, сообщения MQTT отправляются любым «онлайн» подписчикам MQTT.
Побочным эффектом этой особенности является то, что политики не применяются к очередям QoS-0.
С другой стороны, политики применяются к очередям QoS-1. Когда вы оформляете подписку MQTT на уровне QoS-1, RabbitMQ создает «обычный» объект очереди (тип которого настраивается: кворум или классический) для подписки и привязывает его к значению по умолчанию mqtt.exchange
.
К очередям QoS-1 обычно применяются политики. Вот пример снимка экрана подписчика QoS-0 и подписчика QoS-1 в пользовательском интерфейсе RabbitMQ при наличии политики deadletter
, которая применяется к очередям с регулярным выражением .*
:
Обратите внимание, что очередь QoS-0 имеет специальный тип rabbit_mqtt_qos0_queue
(который также настраивается) и не получает политику deadletter
.
Вкратце: вам придется подписаться на тему MQTT с QoS-1 и публиковать сообщения с QoS-1.
В качестве альтернативы вы можете заставить очереди QoS-0 нормально получать политики, изменив флаг функции rabbit_mqtt_qos0_queue
RabbitMQ.
Однако это еще не все, что нужно для решения вашей проблемы. Если вы создадите две подписки QoS-1 в одном и том же соединении MQTT, RabbitMQ реализует их как две привязки (по одной на каждую тему MQTT) к одной и той же временной очереди. Если эта очередь имеет политику недоставленных сообщений, маршрутизирующую ее самой (даже косвенно через обмен недоставленными письмами, который маршрутизируется в связанную очередь через другой ключ маршрутизации), RabbitMQ не доставит ей сообщение. Я подозреваю, но не уверен, что это связано с попыткой RabbitMQ предотвратить бесконечный цикл сообщений.
В результате вам придется выбрать один из двух вариантов:
Выполнение последнего также позволяет вам создать политику недоставленных сообщений для исключения очереди второго потребителя (если потребитель DLQ находится на уровне QoS-1, так что политики даже могут применяться к его очереди), предотвращая истечение срока действия сообщений, которые уже были недоставлены. если потребитель недоставленных писем не сможет добраться до них вовремя.
dead-letter-exchange
, соответствующим вашей конфигурации mqtt.exchange
RabbitMQ (по умолчанию — amq.topic
). Регулярное выражение может быть .*
, если вы потребляете сообщения DLQ из подписки QoS-0.
mqtt-subscription-dlqconsumerqos1
)Приведенный ниже Python иллюстрирует, как будет работать потребление сообщений MQTT, обработанных DLQ, с использованием двух отдельных потребительских процессов.
Политика RabbitMQ для истечения срока действия сообщений через 2 секунды:
rabbitmqctl set_policy TTL "mqtt-subscription-dlq_client.*" '{"message-ttl":2000, "dead-letter-exchange":"amq.topic", "dead-letter-routing-key":"dlq"}' --apply-to queues
Основной потребитель сообщений, не относящихся к DLQ, обработка каждого сообщения которого занимает 3 секунды. Этот скрипт также является издателем, хотя публикации могут происходить откуда угодно, если они имеют QoS-1, primary.py
:
from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import time
def on_connect(client: Client, _, __, result, *___):
client.subscribe('non_dlq', qos=1)
client.publish('non_dlq', 'message1', qos=1)
client.publish('non_dlq', 'message2', qos=1)
client.publish('non_dlq', 'message3', qos=1)
print(f"Connected: {result}")
def on_message(client: Client, _, msg: MQTTMessage):
print(f"Primary: Got message from {msg.topic}: {msg.payload}")
# Sleep before acknowledging the message to force other ready messages in the queue to expire and get dead-lettered.
time.sleep(3)
client = Client(CallbackAPIVersion.VERSION2, client_id = "main_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
# Connect the main consumer with a maximum of 1 local messages to force dead-lettering to occur correctly: RabbitMQ's
# expiration only applies to messages that have not been claimed by a consumer.
connect_properties = Properties(PacketTypes.CONNECT)
connect_properties.ReceiveMaximum = 1
client.connect("localhost", properties=connect_properties)
client.loop_forever()
Потребитель DLQ, dlq.py
:
from paho.mqtt.client import Client, MQTTMessage, MQTTv5
from paho.mqtt.enums import CallbackAPIVersion
def on_connect(client: Client, _, __, result, *___):
client.subscribe('dlq', qos=1)
print(f"Connected: {result}")
def on_message(client: Client, _, msg: MQTTMessage):
print(f"DLQ: Got message from {msg.topic}: {msg.payload}")
client = Client(CallbackAPIVersion.VERSION2, client_id = "dlq_client", protocol=MQTTv5)
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost")
client.loop_forever()
Примените политику RabbitMQ, сначала запустите dlq.py
, затем запустите primary.py
. Результат primary.py
должен быть:
Connected: Success
Primary: Got message from non_dlq: b'message1'
Это указывает на то, что он получил сообщение 1 из 3 из основной темы, не относящейся к DLQ.
Результат dlq.py
должен быть:
Connected: Success
DLQ: Got message from dlq: b'message2'
DLQ: Got message from dlq: b'message3'
Это указывает на то, что он получил второе и третье сообщения из очереди недоставленных сообщений.