RabbitMQ хранит сообщения в обмен

Я пытаюсь выяснить, возможно ли хранить сообщения в обмене RabbitMQ, даже если потребитель не работает.

Я понял (возможно, неправильно), что для этого обмен должен быть «устойчивым», а также очередь, и сообщение должно быть отправлено с «постоянным» флагом.

'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT

Моя основная цель — сохранить все сообщения в обмене, чтобы в случае, если по какой-либо причине ни один потребитель не запущен, когда я запускаю его, все сообщения в обмене могли быть направлены в связанную очередь. Я объявляю свои обмены и очередь следующим образом:

//Sender.php
public function sendToQueue(ActionMessage $message)
    {
        $headers = [
            'content-type' => 'application/json',
            'timestamp' => $message->getCreatedAt()->getTimestamp(),
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
        ];
        $channel = $this->connection->getChannel();
        $channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
        $qMessage = new AMQPMessage(json_encode($message->toArray()), $headers);
        $channel->basic_publish($qMessage, $this->exchangeName, $message->getTopic());
        return true;
    }
//Receiver.php
public function consume($callbackFunction)
        {
            $channel = $this->messenger->getChannel();
            $channel->exchange_declare($this->exchange, 'direct', false, true, false);
            list($queueName, ,) = $channel->queue_declare('', false, true, true, false);
            $channel->queue_bind($queueName, $this->exchangeName, $this->topicAction);

            $channel->basic_consume($queueName, '', false, true, false, false, $callbackFunction);

            while (count($channel->callbacks)) {
                $channel->wait();
            }

            $channel->close();
            $this->messenger->close();
        }

Я буду признателен за любую помощь (даже просто отказаться от этой идеи и вставить между ними какое-то хранилище). Спасибо.

RabbitMQ exchange - Я думаю, вы имеете в виду queue обмен - это средство доставки, такое как topic или fanout и т. д. Способ распространения сообщений. Если вы сделаете очередь устойчивой, без автоматического удаления и требующего ACK, вы можете помещать в нее сообщения без потребителей.
ArtisticPhoenix 22.03.2019 12:10

Если у вас есть автоматическое удаление, очередь удаляется, когда нет потребителей, если у вас нет подтверждения, сообщения проходят прямо через очередь без ожидания. (долговечность может быть необязательной, но это неплохая идея). Это только что пришло мне в голову, но я почти уверен, что это вызовет проблемы, если они будут установлены неправильно по очевидным причинам. То же самое, вероятно, верно и для обмена, но я никогда не пытался «держать» сообщения в обмене.

ArtisticPhoenix 22.03.2019 12:12

одна вещь, которую я сделал, это то, что все мои воркеры получают дополнительную очередь (автоматическое удаление/без подтверждения) с разветвленным обменом, которую я использую для отправки командных сообщений воркерам. Например, я могу отправить им команду consume:queue_name, и они это сделают. Также это дает им очередь по умолчанию и способ сообщить им о новых очередях. По сути, всякий раз, когда клиент отправляет задание, оно помещается в специфическую для него очередь, в основном для того, чтобы они не перехватывали всех рабочих, потому что все их сообщения помещаются в одну очередь только для них. Тогда рабочие просто круговой перебор. и т.д. Таким образом, это органически предотвращает это.

ArtisticPhoenix 22.03.2019 12:19

@ArtisticPhoenix большое спасибо, я проверю.

rayvr 22.03.2019 15:12
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Symfony Station Communiqué - 7 июля 2023 г
Symfony Station Communiqué - 7 июля 2023 г
Это коммюнике первоначально появилось на Symfony Station .
Оживление вашего приложения Laravel: Понимание режима обслуживания
Оживление вашего приложения Laravel: Понимание режима обслуживания
Здравствуйте, разработчики! В сегодняшней статье мы рассмотрим важный аспект управления приложениями, который часто упускается из виду в суете...
Установка и настройка Nginx и PHP на Ubuntu-сервере
Установка и настройка Nginx и PHP на Ubuntu-сервере
В этот раз я сделаю руководство по установке и настройке nginx и php на Ubuntu OS.
Коллекции в Laravel более простым способом
Коллекции в Laravel более простым способом
Привет, читатели, сегодня мы узнаем о коллекциях. В Laravel коллекции - это способ манипулировать массивами и играть с массивами данных. Благодаря...
Как установить PHP на Mac
Как установить PHP на Mac
PHP - это популярный язык программирования, который используется для разработки веб-приложений. Если вы используете Mac и хотите разрабатывать...
2
4
1 629
1

Ответы 1

Обмен не хранит сообщения, это работа очереди. Проблема заключается не в том, что потребители не запущены, а в том, что очереди не существует, потому что вы предоставили потребителям возможность объявлять свои собственные очереди.

Если вы хотите, чтобы сообщения сохранялись до тех пор, пока их не получит потребитель, вы должны объявить:

  • Обмен, который «Отправитель» опубликует
  • Именованная очередь, прикрепленная к этому обмену для каждого типа сообщений, которые будут потребляться отдельно (по одной для каждого ключа маршрутизации, если используется обмен direct)

Оба они могут быть объявлены в сценарии отправителя, но в большинстве случаев имеет смысл объявить их один раз при развертывании приложения, обрабатывая их так же, как схему базы данных.

Вместо создания анонимной очереди в сценарии Receiver вы можете просто подключиться к именованной очереди и начать получать сообщения, ожидающие там.

Основное различие, которое это будет иметь, заключается в том, как будут взаимодействовать несколько потребителей для одного и того же ключа маршрутизации:

  • Несколько очередей, прикрепленных к одному обмену, как в вашем существующем коде, создают несколько копий каждого сообщения. Это полезно, если у вас есть разные потребители, которые делают разные вещи с одними и теми же сообщениями.
  • Несколько потребителей, подключенных к одной очереди, как я предложил выше, будут Поделиться сообщений, причем каждое из них будет обрабатываться другим потребителем по существу случайным образом. Это полезно, если у вас есть несколько идентичных потребителей для обработки большого количества сообщений.

Вы можете найти этот симулятор RabbitMQ полезным для визуализации разницы.

Вы можете обнаружить, что вам действительно нужна смесь:

  • Предварительно объявите очередь для каждого потребителя с должен видеть каждое сообщение, чтобы гарантировать сохранение копии каждого сообщения до тех пор, пока этот конкретный потребитель не будет готов его прочитать.
  • Объявите дополнительные временные очереди в дополнительных потребителях, чтобы собирать дополнительная копия сообщений по мере их поступления.

И последнее замечание: в RabbitMQ есть два механизма возврата к обработке разные для сообщений, которые не могут быть обработаны:

  • Альтернативный обмен захватывает сообщения, которые будут отброшены при обмене (из-за отсутствия соответствующей очереди).
  • Обмен недоставленными письмами захватывает сообщения, которые будут отброшены из очередь (например, из-за того, что они были отклонены потребителем или достигнуто настроенное время ожидания).

AE может быть полезен в вашем примере, если вы на самом деле не хотите нормально обрабатывать пропущенные сообщения, вы просто хотите их обнаружить, например. перечисление их в журнале ошибок.

Другие вопросы по теме