Смещение коммита Apache kafka в топологии apache storm

Я разрабатываю топологию apache storm (используя streamparse), построенную с одним носиком (носик apache kafka) и 1 болтом с параллелизмом> 1, который читает сообщения в пакетном режиме из носика kafka и сохраняет сообщения в таблице mysql.

Болт читает сообщения пачками. Если пакет завершится успешно, я вручную зафиксирую смещение apache kafka.

Когда вставка болта в mysql терпит неудачу, я не фиксирую смещение в kafka, но некоторые сообщения уже находятся в очереди сообщений, которые носик отправил болту.

Сообщения, которые уже находятся в очереди, должны быть удалены, потому что я не могу увеличить смещение kafka без потери предыдущих ошибочных сообщений.

Есть ли способ в streamparse очистить или сбросить все сообщения, которые уже находятся в очереди при запуске болта?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
437
1

Ответы 1

Я не знаю о streamparse, но у меня сложилось впечатление, что вы хотите связать кортежи и написать их как пакет. Допустим, вы записали до смещения 10. Теперь ваш болт получает смещение 11-15, и пакет не может быть записан. Смещение 15-20 поставлено в очередь, и вы не хотите обрабатывать их прямо сейчас, потому что это приведет к обработке пакетов не по порядку.

Правильно ли это понимание?

Во-первых, я бы отказался от ручной фиксации смещений. Вы должны позволить носику справиться с этим. Предполагая, что вы используете storm-kafka-client, вы можете настроить его для фиксации смещений только после подтверждения соответствующего кортежа и всех предыдущих кортежей.

Что вам, вероятно, следует сделать, так это отслеживать в болте (или, что еще лучше, в вашей базе данных), какое наибольшее смещение было в неудачной партии. Затем, когда ваш болт не может записать смещение 11-15, вы можете сделать так, чтобы болт терпел неудачу в каждом кортеже с помощью offset > 15. В какой-то момент вы снова получите смещение 11-15 и сможете повторить попытку записи пакета. Поскольку вы отклонили все сообщения с помощью offset > 15, они также будут отправлены повторно и будут доставлены после сообщений в неудавшемся пакете.

Это решение предполагает, что вы не выполняете переупорядочивание потока сообщений между носиком и вашим записывающим болтом, поэтому сообщения поступают в болт в том порядке, в котором они были отправлены.

Другие вопросы по теме