Мы определили базового подписчика, который пропускает ошибочные сообщения (т. е. по какой-то причине бизнес-логики, которую мы не собираемся обрабатывать), выбрасывая исключение и полагаясь на контроль потока Akka Streams для возобновления Flow:
someLagomService
.someTopic()
.subscribe
.withGroupId("lagom-service")
.atLeastOnce(
Flow[Int]
.mapAsync(1)(el => {
// Exception may occur here or can map to Done
})
.withAttributes(ActorAttributes.supervisionStrategy({
case t =>
Supervision.Resume
})
)
Кажется, это хорошо работает для основных случаев использования при очень небольшой нагрузке, но мы заметили очень странные вещи для большого количества сообщений (например, очень частая повторная обработка сообщений и т. д.).
Покопавшись в коде, мы увидели, что в документации Lagom broker.Subscriber.atLeastOnce говорится:
The
flowmay pull more elements from upstream but it must emit exactly oneDonemessage for each message that it receives. It must also emit them in the same order that the messages were received. This means that theflowmust not filter or collect a subset of the messages, instead it must split the messages into separate streams and map those that would have been dropped toDone.
Кроме того, в реализации Lagom KafkaSubscriberActor мы видим, что реализация private atLeastOnce по существу распаковывает полезную нагрузку и смещение сообщения, а затем снова архивирует и создает резервную копию после того, как наш пользовательский поток сопоставляет сообщения с Done.
Эти два лакомых кусочка выше, похоже, подразумевают, что, используя супервизоры потоков и пропуская элементы, мы можем оказаться в ситуации, когда коммитируемые смещения больше не будут синхронизироваться равномерно с Done, которые должны быть созданы для каждого сообщения Kafka.
Пример: если мы транслируем 1, 2, 3, 4 и сопоставляем 1, 2 и 4 с Done, но выбрасываем исключение для 3, у нас есть 3 Done и 4 фиксируемых смещения?
Done?Использование Лагом 1.4.10





Is this correct / expected? Does this mean we should AVOID using stream supervisors here?
Официальный Документация по API говорит, что
If the Kafka Lagom message broker module is being used, then by default the stream is automatically restarted when a failure occurs.
Таким образом, нет необходимости добавлять свои собственные supervisionStrategy для управления обработкой ошибок. А поток будет перезапущен по умолчанию и не стоит думать о "пропущенных" сообщениях Done.
What sorts of behavior can the uneven zipping cause?
Именно из-за этого в документации написано:
This means that the flow must not filter or collect a subset of the messages
Он может зафиксировать неправильное смещение. И при перезапуске вы можете получить уже обработанные сообщения в виде повтора с зафиксированного нижнего смещения.
What is the recommended approach for error handling when it comes to consuming messages off of Kafka via the Lagom message broker API? Is the right thing to do to map / recover failures to Done?
Lagom позаботится об обработке исключений, отбрасывая сообщение, вызвавшее ошибку, и перезапуская поток. И сопоставление/восстановление сбоев в состояние «Готово» не повлияет на это.
Вы можете подумать, если вам понадобится доступ к этим сообщениям позже, также использовать, например, Try {}, т.е. не генерировать исключение, и собирать сообщения с ошибками, отправляя их в другую тему, это даст вам возможность отслеживать количество ошибок и повторных сообщений, вызвавших ошибку при правильных условиях, т. е. ошибка исправлена.
Спасибо за объяснение. Я ожидаю, что сообщение будет полностью удалено.
Спасибо за подтверждение re: under-commits! Re: Lagom обрабатывает исключения и отбрасывает сообщение. Мы заметили, что, хотя он перезапускает поток, обрабатывающий сообщение, он никогда не отправляет фиксацию в Kafka, поэтому после перезапуска он снова и снова повторяет отравленное сообщение. Определенно согласен, что это была бы хорошая стратегия, чтобы в конечном итоге отправлять сообщения «мертвых писем» в разные темы, как вы упомянули, но наш временный обходной путь состоял в том, чтобы
recoverи отображать ошибки наDone, чтобы прочистить каналы.