следуя предложениям, данным мне в ответах на мой предыдущий вопрос Как протестировать ConsumerAwareRebalanceListener?, мне удалось настроить ConsumerAwarereBalanceListener с помощью Spring Kafka 2.1.x.
Однако следующий код
Map<TopicPartition, Long> queries = new HashMap<>();
final Long rewindTime = getRewindTime();
for (TopicPartition partition : partitions) {
queries.put(partition, rewindTime);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(queries);
result.forEach((key, value) -> consumer.seek(key, value.offset()));
выдает NullPointerException, если результат запроса по теме пуст. В этом случае value - это null. Трассировка стека выглядит следующим образом.
java.lang.NullPointerException: null
at org.rcardin.MyConsumerAwareRebalancedListener.lambda$onPartitionsAssigned$0(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
at java.util.HashMap.forEach(HashMap.java:1288) ~[na:1.8.0_144]
at org.rcardin.MyConsumerAwareRebalancedListener.onPartitionsAssigned(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:596) ~[spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Я пытаюсь написать тест, цель которого - проверить это поведение. Однако, поскольку исключение происходит в отдельном потоке, а не в основном потоке, я не могу получить такое исключение.
Любое предложение?
Большое спасибо,




Согласно документации offsetsForTimes(), это похоже на ошибку в вашем коде:
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
Итак, вам обязательно стоит защитить свой value.offset() с помощью нулевой проверки:
result.entrySet()
.stream()
.filter(e -> e.getValue() != null)
.forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));
Да, я знаю, что это ошибка :) Однако я пытаюсь написать тест, который может показать мне мою ошибку. Извините, возможно, мой вопрос не так понятен. Вы можете помочь мне в этом?
Покажите, пожалуйста, трассировку стека для этой ошибки. Может быть, тогда мы найдем для вас крючок, с которым вы сможете взаимодействовать.
Добавил к моему вопросу. Большое спасибо.
Нет, эта часть не покрывается обработкой ошибок. Вы должны обеспечить правильное поведение в своем собственном коде.
Простите, Артем, а правильно ли игнорировать разделы, возвращающие null? Какое смещение возьмет на себя потребитель для этих разделов?
Это определенно правильно. У вас может быть раздел, но для него нет записей
Это определенно не имеет значения и не принесет особой пользы, поскольку мы не можем использовать его как простую лямбда-форму
filter(Objects::nonNull)- нам нужно проверятьvalueMapEntry, а не всю запись.