Как безопасно пропускать сообщения с помощью Lagom Kafka Message Broker API?

Мы определили базового подписчика, который пропускает ошибочные сообщения (т. е. по какой-то причине бизнес-логики, которую мы не собираемся обрабатывать), выбрасывая исключение и полагаясь на контроль потока Akka Streams для возобновления Flow:

someLagomService
  .someTopic()
  .subscribe
  .withGroupId("lagom-service")
  .atLeastOnce(
    Flow[Int]
      .mapAsync(1)(el => {
        // Exception may occur here or can map to Done
      })
      .withAttributes(ActorAttributes.supervisionStrategy({
        case t =>
          Supervision.Resume
      })
  )

Кажется, это хорошо работает для основных случаев использования при очень небольшой нагрузке, но мы заметили очень странные вещи для большого количества сообщений (например, очень частая повторная обработка сообщений и т. д.).

Покопавшись в коде, мы увидели, что в документации Lagom broker.Subscriber.atLeastOnce говорится:

The flow may pull more elements from upstream but it must emit exactly one Done message for each message that it receives. It must also emit them in the same order that the messages were received. This means that the flow must not filter or collect a subset of the messages, instead it must split the messages into separate streams and map those that would have been dropped to Done.

Кроме того, в реализации Lagom KafkaSubscriberActor мы видим, что реализация private atLeastOnce по существу распаковывает полезную нагрузку и смещение сообщения, а затем снова архивирует и создает резервную копию после того, как наш пользовательский поток сопоставляет сообщения с Done.

Эти два лакомых кусочка выше, похоже, подразумевают, что, используя супервизоры потоков и пропуская элементы, мы можем оказаться в ситуации, когда коммитируемые смещения больше не будут синхронизироваться равномерно с Done, которые должны быть созданы для каждого сообщения Kafka.

Пример: если мы транслируем 1, 2, 3, 4 и сопоставляем 1, 2 и 4 с Done, но выбрасываем исключение для 3, у нас есть 3 Done и 4 фиксируемых смещения?

  • Это правильно/ожидается? Означает ли это, что мы должны ИЗБЕГАТЬ использования здесь супервизоров потоков?
  • Какое поведение может быть вызвано неравномерным сжатием?
  • Каков рекомендуемый подход к обработке ошибок, когда речь идет об использовании сообщений из Kafka через API брокера сообщений Lagom? Правильно ли сопоставлять / восстанавливать сбои в Done?

Использование Лагом 1.4.10

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
472
1

Ответы 1

Is this correct / expected? Does this mean we should AVOID using stream supervisors here?

Официальный Документация по API говорит, что

If the Kafka Lagom message broker module is being used, then by default the stream is automatically restarted when a failure occurs.

Таким образом, нет необходимости добавлять свои собственные supervisionStrategy для управления обработкой ошибок. А поток будет перезапущен по умолчанию и не стоит думать о "пропущенных" сообщениях Done.


What sorts of behavior can the uneven zipping cause?

Именно из-за этого в документации написано:

This means that the flow must not filter or collect a subset of the messages

Он может зафиксировать неправильное смещение. И при перезапуске вы можете получить уже обработанные сообщения в виде повтора с зафиксированного нижнего смещения.


What is the recommended approach for error handling when it comes to consuming messages off of Kafka via the Lagom message broker API? Is the right thing to do to map / recover failures to Done?

Lagom позаботится об обработке исключений, отбрасывая сообщение, вызвавшее ошибку, и перезапуская поток. И сопоставление/восстановление сбоев в состояние «Готово» не повлияет на это.

Вы можете подумать, если вам понадобится доступ к этим сообщениям позже, также использовать, например, Try {}, т.е. не генерировать исключение, и собирать сообщения с ошибками, отправляя их в другую тему, это даст вам возможность отслеживать количество ошибок и повторных сообщений, вызвавших ошибку при правильных условиях, т. е. ошибка исправлена.

Спасибо за подтверждение re: under-commits! Re: Lagom обрабатывает исключения и отбрасывает сообщение. Мы заметили, что, хотя он перезапускает поток, обрабатывающий сообщение, он никогда не отправляет фиксацию в Kafka, поэтому после перезапуска он снова и снова повторяет отравленное сообщение. Определенно согласен, что это была бы хорошая стратегия, чтобы в конечном итоге отправлять сообщения «мертвых писем» в разные темы, как вы упомянули, но наш временный обходной путь состоял в том, чтобы recover и отображать ошибки на Done, чтобы прочистить каналы.

simonl 12.03.2019 17:01

Спасибо за объяснение. Я ожидаю, что сообщение будет полностью удалено.

Ivan Stanislavciuc 12.03.2019 17:06

Другие вопросы по теме