Я разработал @KafkaListener
, который также помечен интерфейсом ConsumerAwareRebalanceListener
, используя Spring Boot 2.0.6. Я реализовал метод onPartitionsAssigned
, в котором я перематываю смещение на фиксированный промежуток времени, скажем, 60 секунд.
Все идет нормально.
Как я могу протестировать описанный выше вариант использования с помощью инструментов, которые предоставляет мне Spring Kafka? Я предположил, что мне нужно запустить брокера Kafka (то есть EmbeddedKafka
), затем остановить прослушиватель, а затем перезагрузить его снова, чтобы проверить, что он снова прочитал сообщения, полученные за последние 60 секунд.
Кто-нибудь может мне помочь? Немного погуглил, но ничего не нашел. Большое спасибо.
@KafkaListener
имеет:
/**
* The unique identifier of the container managing for this endpoint.
* <p>If none is specified an auto-generated one is provided.
* @return the {@code id} for the container managing for this endpoint.
* @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
*/
String id() default "";
атрибут, поэтому вы можете получить доступ к его MessageListenerContainer
через упомянутый KafkaListenerEndpointRegistry
, который вы можете просто @Autowired
в тестовый класс на основе Spring Testing Framework. Тогда вы действительно можете stop()
и start()
, что MessageListenerContainer
в вашем методе тестирования.
Также обратите внимание, как @KafkaListener
также имеет атрибут autoStartup()
.
@GaryRussell, можешь привести мне пример? Спасибо.
public class MyRebalanceListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
long rewindTo = System.currentTimeMillis() - 60000;
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(partitions.stream()
.collect(Collectors.toMap(tp -> tp, tp -> rewindTo)));
offsetsForTimes.forEach((k, v) -> consumer.seek(k, v.offset()));
}
}
а также
@RunWith(SpringRunner.class)
@SpringBootTest
public class So52973119ApplicationTests {
@Test
public void rebalanceListenerTests() {
MyRebalanceListener listener = new MyRebalanceListener();
Consumer<?, ?> consumer = mock(Consumer.class);
AtomicLong expected = new AtomicLong(System.currentTimeMillis() - 60_000);
given(consumer.offsetsForTimes(anyMap())).willAnswer(i -> {
AtomicLong offset = new AtomicLong();
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
Map<TopicPartition, Long> argument = i.getArgument(0);
argument.forEach((k, v) -> {
offsetsForTimes.put(k, new OffsetAndTimestamp(offset.incrementAndGet(), 0L));
assertThat(v).isBetween(expected.get(), expected.get() + 1_000);
});
return offsetsForTimes ;
});
TopicPartition t1 = new TopicPartition("foo", 0);
TopicPartition t2 = new TopicPartition("foo", 1);
List<TopicPartition> partitions = new ArrayList<>();
partitions.add(t1);
partitions.add(t2);
listener.onPartitionsAssigned(consumer, partitions);
verify(consumer).seek(t1, 1);
verify(consumer).seek(t2, 2);
}
}
Спасибо, Гэри. Ты спас мне день :)
Но если вы просто хотите провести модульное тестирование своего слушателя, просто вызовите его с имитацией
Consumer
и убедитесь, что он сделал то, что вы ожидали.