Как протестировать ConsumerAwareRebalanceListener?

Я разработал @KafkaListener, который также помечен интерфейсом ConsumerAwareRebalanceListener, используя Spring Boot 2.0.6. Я реализовал метод onPartitionsAssigned, в котором я перематываю смещение на фиксированный промежуток времени, скажем, 60 секунд.

Все идет нормально.

Как я могу протестировать описанный выше вариант использования с помощью инструментов, которые предоставляет мне Spring Kafka? Я предположил, что мне нужно запустить брокера Kafka (то есть EmbeddedKafka), затем остановить прослушиватель, а затем перезагрузить его снова, чтобы проверить, что он снова прочитал сообщения, полученные за последние 60 секунд.

Кто-нибудь может мне помочь? Немного погуглил, но ничего не нашел. Большое спасибо.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
4
0
2 463
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

@KafkaListener имеет:

/**
 * The unique identifier of the container managing for this endpoint.
 * <p>If none is specified an auto-generated one is provided.
 * @return the {@code id} for the container managing for this endpoint.
 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
 */
String id() default "";

атрибут, поэтому вы можете получить доступ к его MessageListenerContainer через упомянутый KafkaListenerEndpointRegistry, который вы можете просто @Autowired в тестовый класс на основе Spring Testing Framework. Тогда вы действительно можете stop() и start(), что MessageListenerContainer в вашем методе тестирования.

Также обратите внимание, как @KafkaListener также имеет атрибут autoStartup().

Но если вы просто хотите провести модульное тестирование своего слушателя, просто вызовите его с имитацией Consumer и убедитесь, что он сделал то, что вы ожидали.

Gary Russell 24.10.2018 18:45

@GaryRussell, можешь привести мне пример? Спасибо.

riccardo.cardin 24.10.2018 20:23
public class MyRebalanceListener implements ConsumerAwareRebalanceListener {

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        long rewindTo = System.currentTimeMillis() - 60000;
        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
                .collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
        offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
    }

}

а также

@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {

    @Test
    public void rebalanceListenerTests() {
        MyRebalanceListener listener = new MyRebalanceListener();
        Consumer<?, ?> consumer = mock(Consumer.class);
        AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
        given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
            AtomicLong offset = new AtomicLong();
            Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
            Map<TopicPartition, Long> argument = i.getArgument(0);
            argument.forEach((k, v) -> {
                offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
                assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
            });
            return offsetsForTimes ;
        });
        TopicPartition t1 = new TopicPartition("foo", 0);
        TopicPartition t2 = new TopicPartition("foo", 1);
        List<TopicPartition> partitions = new ArrayList<>();
        partitions.add(t1);
        partitions.add(t2);
        listener.onPartitionsAssigned(consumer, partitions);
        verify(consumer).seek(t1, 1);
        verify(consumer).seek(t2, 2);
    }

}

Спасибо, Гэри. Ты спас мне день :)

riccardo.cardin 25.10.2018 11:02

Другие вопросы по теме