Максимальная длина очереди обмена RabbitMQ

Я пытаюсь применить некоторое управление потоком в своем приложении, основанном на RabbitMQ.

Очень краткий обзор моей системы:

  • Есть несколько синих рабочих, которые сканируют и вводят каталог и публикуют сообщения на обмен.
  • Есть и другие ред-воркеры, которые потребляют из этого обмена (на основе ключей маршрутизации) и делают с данными все, что они делают, а затем удаляют их.

Данные, которые «хранятся» на бирже, довольно велики, и время, которое требуется рабочему для их обработки, заметно. Через некоторое время я получаю предупреждение о памяти от RabbitMQ, в котором говорится, что использование памяти слишком велико, и все операции публикации остановлены.

Я попытался увеличить объем памяти, который использует Rabbitmq, но это просто отложило проблему на несколько часов (время выполнения). Я также сделал очереди дисковыми, а не оперативными, но вместо этого мой диск был заполнен.

Поскольку мой ввод не такой большой, я могу жить с «большой» очередью ввода, из которой синие рабочие считывают свой ввод. Вот я и подумал попробовать установить некую "максимальную длину" на линк между синими рабочими и биржей. Я думаю, что ничего не потеряю здесь, так как реальным узким местом моей системы являются редворкеры (кстати я задекларировал связь между редворкерами и обменом с prefech_count=2).

После всего этого... Мне не удалось применить такую ​​максимальную длину :( Я использую Pika для объявления своих очередей и работы с каналами. Я прочитал это (https://www.rabbitmq.com/maxlength.html), но не смог реализовать его в своем коде, и я хотел бы увидеть пример, в котором используется этот флаг максимального размера.

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
1 582
1

Ответы 1

Обмены не хранят сообщения: их хранят очереди.

Если вы установите максимальную длину очереди, установив соответствующий arguments в вызове queue_declare(), сообщения, которые не поместятся в нее, будут удалены.

Однако издатель сможет публиковать сообщения на бирже, не замечая этого.

Производитель может получить уведомление об их сбое, используя мертвая буква.

Если вам нужна помощь с кодом, включите код в вопрос.

О, я вижу. Теперь мне удалось сделать это, используя аргумент «x-max-length». Однако я не могу потерять сообщения, поэтому я также попытался добавить аргумент {'x-overflow': 'reject-publish'}, но возвращаемое значение channel.basic_publish всегда равно True, хотя я вижу, что сообщения не продолжают полный конвейер. У вас есть идеи, почему?

Andy Thomas 06.02.2019 13:41

Вы включили издатель подтверждает? из документации про переполнение тот конфиг только указал rabbitmq какие сообщения выбрасывать, новые вместо старых. Если вы получите basic.nack, издатель будет уведомлен о том, что сообщение не достигло места назначения (очереди). Я бы предположил, основываясь на вашем описании, что вы реализовали какое-то ожидание на стороне издателя, чтобы попробовать его снова после задержки?

Olivier 06.02.2019 15:14

@AndyThomas, потому что сообщения всегда удается опубликовать, если обмен доступен. Если ваш обменник не имеет привязанных к нему очередей, сообщения теряются. Если очередь заполнена, сообщения теряются. Если вы хотите, чтобы сообщения никогда не терялись, вам нужно использовать недоставленные сообщения или какую-то обратную связь от потребителя к производителю, чтобы производитель замедлился/подождал. Но это слишком абстрактно — добавьте небольшой рабочий пример, чтобы мы могли вам помочь.

Sigi 06.02.2019 16:04

@Сигизмондо Спасибо! Я думаю, что действительно реализую какой-то механизм обратной связи, как вы сказали.

Andy Thomas 10.02.2019 13:19

Другие вопросы по теме