Я пытаюсь применить некоторое управление потоком в своем приложении, основанном на RabbitMQ.
Очень краткий обзор моей системы:
Данные, которые «хранятся» на бирже, довольно велики, и время, которое требуется рабочему для их обработки, заметно. Через некоторое время я получаю предупреждение о памяти от RabbitMQ, в котором говорится, что использование памяти слишком велико, и все операции публикации остановлены.
Я попытался увеличить объем памяти, который использует Rabbitmq, но это просто отложило проблему на несколько часов (время выполнения). Я также сделал очереди дисковыми, а не оперативными, но вместо этого мой диск был заполнен.
Поскольку мой ввод не такой большой, я могу жить с «большой» очередью ввода, из которой синие рабочие считывают свой ввод. Вот я и подумал попробовать установить некую "максимальную длину" на линк между синими рабочими и биржей. Я думаю, что ничего не потеряю здесь, так как реальным узким местом моей системы являются редворкеры (кстати я задекларировал связь между редворкерами и обменом с prefech_count=2).
После всего этого... Мне не удалось применить такую максимальную длину :(
Я использую Pika для объявления своих очередей и работы с каналами.
Я прочитал это (https://www.rabbitmq.com/maxlength.html), но не смог реализовать его в своем коде, и я хотел бы увидеть пример, в котором используется этот флаг максимального размера.





Обмены не хранят сообщения: их хранят очереди.
Если вы установите максимальную длину очереди, установив соответствующий arguments в вызове queue_declare(), сообщения, которые не поместятся в нее, будут удалены.
Однако издатель сможет публиковать сообщения на бирже, не замечая этого.
Производитель может получить уведомление об их сбое, используя мертвая буква.
Если вам нужна помощь с кодом, включите код в вопрос.
Вы включили издатель подтверждает? из документации про переполнение тот конфиг только указал rabbitmq какие сообщения выбрасывать, новые вместо старых. Если вы получите basic.nack, издатель будет уведомлен о том, что сообщение не достигло места назначения (очереди). Я бы предположил, основываясь на вашем описании, что вы реализовали какое-то ожидание на стороне издателя, чтобы попробовать его снова после задержки?
@AndyThomas, потому что сообщения всегда удается опубликовать, если обмен доступен. Если ваш обменник не имеет привязанных к нему очередей, сообщения теряются. Если очередь заполнена, сообщения теряются. Если вы хотите, чтобы сообщения никогда не терялись, вам нужно использовать недоставленные сообщения или какую-то обратную связь от потребителя к производителю, чтобы производитель замедлился/подождал. Но это слишком абстрактно — добавьте небольшой рабочий пример, чтобы мы могли вам помочь.
@Сигизмондо Спасибо! Я думаю, что действительно реализую какой-то механизм обратной связи, как вы сказали.
О, я вижу. Теперь мне удалось сделать это, используя аргумент «x-max-length». Однако я не могу потерять сообщения, поэтому я также попытался добавить аргумент
{'x-overflow': 'reject-publish'}, но возвращаемое значениеchannel.basic_publishвсегда равноTrue, хотя я вижу, что сообщения не продолжают полный конвейер. У вас есть идеи, почему?