Я надеюсь, что кто-то может дать мне подсказку, что я делаю неправильно здесь. Я написал собственный обработчик ошибок для пакетного прослушивателя, который должен искать за полученными записями, отправлять их в dlq. Я сильно устал, но не могу заставить его работать. Моя текущая реализация будет зависать в бесконечном цикле, получая записи снова и снова. Вот код обработчика ошибок:
@Service("consumerAwareListenerErrorHandlerImpl")
public class ConsumerAwareListenerErrorHandlerImpl implements ConsumerAwareListenerErrorHandler {
private final Executor executor;
private final KafkaListenerEndpointRegistry registry;
private final TaskScheduler scheduler;
@Autowired
public ConsumerAwareListenerErrorHandlerImpl(KafkaListenerEndpointRegistry registry, TaskScheduler scheduler) {
this.scheduler = scheduler;
this.executor = new SimpleAsyncTaskExecutor();
this.registry = registry;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
MessageHeaders headers = message.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Acknowledgment acknowledgment = headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> (v == null) ? offsets.get(index) : Math.max(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> consumer.seek(k, v));
if (!(exception.getCause() instanceof DeserializationException)) {
//pauseAndRestartContainer();
}
acknowledgment.acknowledge();
consumer.commitSync();
return null;
}




Вы должны стремиться к смещению + 1, чтобы пройти «прошлое». Поиск offset приведет к повторному воспроизведению.
Мы добавили ErrorHandlingDeserializer2 (...2 лучше, чем первый, потому что он безопасен для типов) именно по этой причине. Проблема в том, что исключения десериализации возникают до того, как Spring получит данные, поэтому мы больше ничего не можем сделать.
Но я все еще должен фильтровать их в контейнере, как кажется?!
Да, при использовании пакетного прослушивателя; однако теперь это более удобно, потому что значение является типобезопасным (либо null, либо фиктивный десериализованный объект, если вы предоставляете failedDeserializationFunction). При использовании прослушивателя на основе записей неудачная запись отправляется непосредственно в обработчик ошибок контейнера.
Еще один вопрос, Гэри Рассел: если я использую ErrorHandlingDeserializer2, отправитель должен использовать тот же класс для отправки json моему прослушивателю контейнера. В противном случае я получаю «org.springframework.messaging.converter.MessageConversionException: не удалось разрешить имя класса». Если я не использую ErrorHandlingDeserializer2, отправитель может отправить любой json, если он совместим с целевым d, чтобы он был десериализован без каких-либо исключений. У вас есть подсказка для меня, как решить эту проблему?
Хорошо, я узнал, что могу сделать это, используя; props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, ложь);
Спасибо за подсказку! У меня также есть проблема, что ошибка hanlde не вызывается, когда возникает исключение десериализации. Конечно, я могу использовать ErrorHandlingDeserializer и вручную фильтровать ошибочные записи, но это мне не помогает. Я пытаюсь: Исключение сериализации: log; искать прошлое; продолжать идти; Другое исключение: пауза; перезапуск по расписанию; попробуйте еще раз У вас есть идеи, как решить эту проблему для пакетного прослушивателя?