У нас есть рабочий rabbitmq.implementation, из-за объёма планируем перейти на kafka.
У меня есть сомнения в одном месте.
В RabbitMQ, когда потребитель получает сообщение от Q, сообщение переходит на другой этап, неподтвержденный этап. клиенту/потребителю требуется некоторое время для обработки сообщения, после успешного процесса он отправляет подтверждение на Q, и сообщение удаляется из Q. в случае неудачи, после определенного периода, если Q не получает подтверждение, сообщение добавлен в конце Q . Таким образом, мы не теряем ни одного сообщения.
С моими небольшими познаниями в Kafka я понимаю, что если например сообщение 100 не было успешно обработано, смещение не было увеличено, но оно будет увеличено, если сообщение 101 будет обработано успешно. Поэтому я потерял сообщение 100.
Есть ли способ гарантировать, что ни одно из сообщений не будет потеряно?
Почему бы не рассмотреть Solace как более близкую замену Rabbit для обмена сообщениями? Похожие API + семантика (например, ACK для каждого сообщения), но лучшая обработка объема.

Смещение вашего сообщения не будет увеличено, если вы не опросите новые сообщения. Таким образом, вы должны быть обеспокоены повторной обработкой вашего сообщения.
Если вы хотите сохранить результат обработки ваших данных в кластере Kafka, вы можете использовать транзакционная функция Kafka. Таким образом, вы можете поддержать доставку ровно один раз. Все ваши изменения будут сохранены или ни одно из них не будет сохранено.
Другой подход состоит в том, чтобы сделать ваш сценарий обработки идемпотентным. Вы назначите уникальный идентификатор для каждого сообщения в Kafka. Когда вы обрабатываете сообщение, вы сохраняете идентификатор в базе данных. После сбоя вы проверяете, что ваш идентификатор сообщения уже обработан, просматривая базу данных.
Kafka не удаляет сообщения из тем, если они не достигают одной из log.retention.byteslog.retention.hourslog.retention.minuteslog.retention.ms конфигов. поэтому, если смещение увеличивается, вы не теряете предыдущие сообщения, и вы можете просто изменить смещение на желаемую позицию.
@admin, мой потребитель не отслеживал, а у нас есть сотни клиентов, потребляющих сообщения RabbitMQ. Они потребляют и подтверждают обратно для каждого сообщения. а RabitMQ позаботится об удалении успешно обработанного сообщения и повторной публикации в Q неудачных сообщений. Теперь, если смещение продвинулось вперед для клиента без обработки сообщения, как клиент снова использует то же сообщение, без изменений на стороне клиента.
Вы должны немного прочитать о том, как работает потребление сообщений в Kafka. Вот ссылка на потребительский раздел официальной документации Kafka: https://kafka.apache.org/documentation/#theconsumer
По сути, в Kafka сообщения удаляются только по прошествии достаточного времени, и это настраивается с помощью log.retention.hours, log.retention.minutes и log.retention.ms, как сказал @Amin.
В Kafka любое количество потребителей может начать потреблять сообщения из любой темы в любой момент, независимо от того, потребляют ли уже другие потребители из той же темы. Kafka отслеживает, где находится каждый потребитель в каждой теме/разделе, используя смещения, которые хранятся в самой Kafka. Итак, если вашему потребителю нужно использовать сообщение 100, как вы описали в своем вопросе, вы можете просто «перемотать» на нужное сообщение и снова начать нормально потреблять. Не имеет значения, потребляли ли вы его ранее, или читают ли другие потребители эту тему или нет.
Из официальных документов Кафки:
A consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but turns out to be an essential feature for many consumers. For example, if the consumer code has a bug and is discovered after some messages are consumed, the consumer can re-consume those messages once the bug is fixed.
Спасибо @mjuarez, мой сценарий немного отличается, позвольте мне объяснить, поэтому у меня есть только один потребитель / читатель для каждой темы, если у меня их несколько, они предназначены для параллелизма, поэтому они будут читать / потреблять разные сообщения. Но потребитель/читатель, когда не может обработать сообщение и перенаправляется к следующему, может быть, сообщение все еще останется в kafka, но оно будет продолжать увеличивать смещение, читая только новые сообщения. SO тот, который не удалось, не будет обработан опять таки . Для RabbitMq это было легко, так как он переместит сообщение обратно в Q . Kafka сохраняет одно смещение для каждого потребителя, RabbitMQ — для каждого сообщения.
Просто настройте свой клиент для чтения сообщений kafka по одному за раз и фиксируйте смещения только после того, как вы успешно обработаете каждое сообщение, и тогда у вас будет поведение, аналогичное rabbitmq (только быстрее при использовании тематических разделов для сегментирования и нескольких потребителей в общей группе потребителей). ) Это будет медленнее, чем в обычном режиме, когда клиенты Kafka потребляют сообщения пакетами, но все равно быстрее.
Кроме того, клиенты kafka обычно сохраняют одно смещение на раздел для каждой группы потребителей, поэтому тема с 16 разделами и 4 потребителями в одной группе потребителей будет содержать 16 смещений (по одному на раздел).
@ Ганс, это рабочее решение, но в конечном итоге это замедлит процесс. Есть ли альтернатива, иначе мы должны придерживаться старого RabbitMq
Альтернативой является то, что вы опрашиваете пакеты сообщений Kafka и управляете своими собственными смещениями. Если вам нужно пропустить сообщение, вы записываете его смещения в очередь мертвых сообщений или во внешнюю систему, отличную от Kafka, для последующей повторной обработки.
Я также столкнулся с тем же вопросом. Если я хочу выразиться проще, RabbitMQ ведет подсчет каждого
Кафки нет, так что вы не можете сделать это в готовом виде, вы должны реализовать его сами.
Однако есть варианты, используйте kmq, производительность станет меньше 50%, посмотрите
https://softwaremill.com/kafka-with-selective-acknowledgments-performance/
Вам нужно будет реализовать очередь недоставленных писем (DLQ).