Я разрабатываю топологию apache storm (используя streamparse), построенную с одним носиком (носик apache kafka) и 1 болтом с параллелизмом> 1, который читает сообщения в пакетном режиме из носика kafka и сохраняет сообщения в таблице mysql.
Болт читает сообщения пачками. Если пакет завершится успешно, я вручную зафиксирую смещение apache kafka.
Когда вставка болта в mysql терпит неудачу, я не фиксирую смещение в kafka, но некоторые сообщения уже находятся в очереди сообщений, которые носик отправил болту.
Сообщения, которые уже находятся в очереди, должны быть удалены, потому что я не могу увеличить смещение kafka без потери предыдущих ошибочных сообщений.
Есть ли способ в streamparse очистить или сбросить все сообщения, которые уже находятся в очереди при запуске болта?

Я не знаю о streamparse, но у меня сложилось впечатление, что вы хотите связать кортежи и написать их как пакет. Допустим, вы записали до смещения 10. Теперь ваш болт получает смещение 11-15, и пакет не может быть записан. Смещение 15-20 поставлено в очередь, и вы не хотите обрабатывать их прямо сейчас, потому что это приведет к обработке пакетов не по порядку.
Правильно ли это понимание?
Во-первых, я бы отказался от ручной фиксации смещений. Вы должны позволить носику справиться с этим. Предполагая, что вы используете storm-kafka-client, вы можете настроить его для фиксации смещений только после подтверждения соответствующего кортежа и всех предыдущих кортежей.
Что вам, вероятно, следует сделать, так это отслеживать в болте (или, что еще лучше, в вашей базе данных), какое наибольшее смещение было в неудачной партии. Затем, когда ваш болт не может записать смещение 11-15, вы можете сделать так, чтобы болт терпел неудачу в каждом кортеже с помощью offset > 15. В какой-то момент вы снова получите смещение 11-15 и сможете повторить попытку записи пакета. Поскольку вы отклонили все сообщения с помощью offset > 15, они также будут отправлены повторно и будут доставлены после сообщений в неудавшемся пакете.
Это решение предполагает, что вы не выполняете переупорядочивание потока сообщений между носиком и вашим записывающим болтом, поэтому сообщения поступают в болт в том порядке, в котором они были отправлены.