Я пытаюсь выяснить, возможно ли хранить сообщения в обмене RabbitMQ, даже если потребитель не работает.
Я понял (возможно, неправильно), что для этого обмен должен быть «устойчивым», а также очередь, и сообщение должно быть отправлено с «постоянным» флагом.
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
Моя основная цель — сохранить все сообщения в обмене, чтобы в случае, если по какой-либо причине ни один потребитель не запущен, когда я запускаю его, все сообщения в обмене могли быть направлены в связанную очередь. Я объявляю свои обмены и очередь следующим образом:
//Sender.php
public function sendToQueue(ActionMessage $message)
{
$headers = [
'content-type' => 'application/json',
'timestamp' => $message->getCreatedAt()->getTimestamp(),
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel = $this->connection->getChannel();
$channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
$channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
return true;
}
//Receiver.php
public function consume($callbackFunction)
{
$channel = $this->messenger->getChannel();
$channel->exchange_declare($this->exchange, 'direct', false, true, false);
list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
$channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);
$channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$this->messenger->close();
}
Я буду признателен за любую помощь (даже просто отказаться от этой идеи и вставить между ними какое-то хранилище). Спасибо.
Если у вас есть автоматическое удаление, очередь удаляется, когда нет потребителей, если у вас нет подтверждения, сообщения проходят прямо через очередь без ожидания. (долговечность может быть необязательной, но это неплохая идея). Это только что пришло мне в голову, но я почти уверен, что это вызовет проблемы, если они будут установлены неправильно по очевидным причинам. То же самое, вероятно, верно и для обмена, но я никогда не пытался «держать» сообщения в обмене.
одна вещь, которую я сделал, это то, что все мои воркеры получают дополнительную очередь (автоматическое удаление/без подтверждения) с разветвленным обменом, которую я использую для отправки командных сообщений воркерам. Например, я могу отправить им команду consume:queue_name, и они это сделают. Также это дает им очередь по умолчанию и способ сообщить им о новых очередях. По сути, всякий раз, когда клиент отправляет задание, оно помещается в специфическую для него очередь, в основном для того, чтобы они не перехватывали всех рабочих, потому что все их сообщения помещаются в одну очередь только для них. Тогда рабочие просто круговой перебор. и т.д. Таким образом, это органически предотвращает это.
@ArtisticPhoenix большое спасибо, я проверю.






Обмен не хранит сообщения, это работа очереди. Проблема заключается не в том, что потребители не запущены, а в том, что очереди не существует, потому что вы предоставили потребителям возможность объявлять свои собственные очереди.
Если вы хотите, чтобы сообщения сохранялись до тех пор, пока их не получит потребитель, вы должны объявить:
direct)Оба они могут быть объявлены в сценарии отправителя, но в большинстве случаев имеет смысл объявить их один раз при развертывании приложения, обрабатывая их так же, как схему базы данных.
Вместо создания анонимной очереди в сценарии Receiver вы можете просто подключиться к именованной очереди и начать получать сообщения, ожидающие там.
Основное различие, которое это будет иметь, заключается в том, как будут взаимодействовать несколько потребителей для одного и того же ключа маршрутизации:
Вы можете найти этот симулятор RabbitMQ полезным для визуализации разницы.
Вы можете обнаружить, что вам действительно нужна смесь:
И последнее замечание: в RabbitMQ есть два механизма возврата к обработке разные для сообщений, которые не могут быть обработаны:
AE может быть полезен в вашем примере, если вы на самом деле не хотите нормально обрабатывать пропущенные сообщения, вы просто хотите их обнаружить, например. перечисление их в журнале ошибок.
RabbitMQ exchange- Я думаю, вы имеете в видуqueueобмен - это средство доставки, такое какtopicилиfanoutи т. д. Способ распространения сообщений. Если вы сделаете очередь устойчивой, без автоматического удаления и требующего ACK, вы можете помещать в нее сообщения без потребителей.