Тестирование исключений в ConsumerAwareRebalanceListener

следуя предложениям, данным мне в ответах на мой предыдущий вопрос Как протестировать ConsumerAwareRebalanceListener?, мне удалось настроить ConsumerAwarereBalanceListener с помощью Spring Kafka 2.1.x.

Однако следующий код

Map<TopicPartition, Long> queries = new HashMap<>();
final Long rewindTime = getRewindTime();
for (TopicPartition partition : partitions) {
    queries.put(partition, rewindTime);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(queries);
result.forEach((key, value) -> consumer.seek(key, value.offset()));

выдает NullPointerException, если результат запроса по теме пуст. В этом случае value - это null. Трассировка стека выглядит следующим образом.

java.lang.NullPointerException: null
    at org.rcardin.MyConsumerAwareRebalancedListener.lambda$onPartitionsAssigned$0(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
    at java.util.HashMap.forEach(HashMap.java:1288) ~[na:1.8.0_144]
    at org.rcardin.MyConsumerAwareRebalancedListener.onPartitionsAssigned(MyConsumerAwareRebalancedListener.java:40) ~[classes/:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$1.onPartitionsAssigned(KafkaMessageListenerContainer.java:596) ~[spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]

Я пытаюсь написать тест, цель которого - проверить это поведение. Однако, поскольку исключение происходит в отдельном потоке, а не в основном потоке, я не могу получить такое исключение.

Любое предложение?

Большое спасибо,

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
145
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Согласно документации offsetsForTimes(), это похоже на ошибку в вашем коде:

 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.

Итак, вам обязательно стоит защитить свой value.offset() с помощью нулевой проверки:

result.entrySet()
            .stream()
            .filter(e -> e.getValue() != null)
            .forEach(e -> consumer.seek(e.getKey(), e.getValue().offset()));

Это определенно не имеет значения и не принесет особой пользы, поскольку мы не можем использовать его как простую лямбда-форму filter(Objects::nonNull) - нам нужно проверять valueMapEntry, а не всю запись.

Artem Bilan 29.10.2018 15:28

Да, я знаю, что это ошибка :) Однако я пытаюсь написать тест, который может показать мне мою ошибку. Извините, возможно, мой вопрос не так понятен. Вы можете помочь мне в этом?

riccardo.cardin 29.10.2018 15:29

Покажите, пожалуйста, трассировку стека для этой ошибки. Может быть, тогда мы найдем для вас крючок, с которым вы сможете взаимодействовать.

Artem Bilan 29.10.2018 15:32

Добавил к моему вопросу. Большое спасибо.

riccardo.cardin 29.10.2018 15:37

Нет, эта часть не покрывается обработкой ошибок. Вы должны обеспечить правильное поведение в своем собственном коде.

Artem Bilan 29.10.2018 16:07

Простите, Артем, а правильно ли игнорировать разделы, возвращающие null? Какое смещение возьмет на себя потребитель для этих разделов?

riccardo.cardin 31.10.2018 10:14

Это определенно правильно. У вас может быть раздел, но для него нет записей

Artem Bilan 31.10.2018 14:08

Другие вопросы по теме