Я пытаюсь провести модульное тестирование потребительского класса kafka весной. Я хочу знать, что если сообщение kafka отправлено в эту тему, метод прослушивателя был вызван правильно. Мой потребительский класс аннотируется следующим образом:
@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...
Если я @Autowire являюсь потребителем, когда я отправляю сообщение kafka, метод слушателя вызывается правильно, но я не могу утверждать, что метод был вызван, потому что класс не является имитацией.
Если я издеваюсь над потребителем, когда я отправляю сообщение kafka, метод слушателя вообще не вызывается. Я могу вызвать метод напрямую и утверждать, что он сработал, но это не делает того, что я хочу, а именно проверки, вызывается ли метод, когда я отправляю сообщение kafka в эту тему.
На данный момент я прибег к установке счетчика внутри потребителя и увеличиваю его каждый раз, когда вызывается метод слушателя, а затем проверяю, что его значение было изменено. Создание переменной только для тестирования кажется мне ужасным решением.
Может быть, есть способ заставить издеваемого потребителя получать сообщения kafka? Или каким-то другим способом утверждать, что был вызван немодельный метод прослушивателя потребителя?
Похоже, вы запрашиваете нечто похожее на то, что есть в Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness
Итак, если вы не умеете использовать дополнительную переменную, вы можете позаимствовать этот решение и реализовать свою собственную «привязь».
Я думаю, что это должно быть хорошим дополнением к Framework, поэтому, пожалуйста, поднимите соответствующий проблема, и мы вместе сможем представить такой инструмент для общественности.
ОБНОВИТЬ
Итак, согласно Spring AMQP Foundation, я сделал это в своей тестовой конфигурации:
public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {
private final Map<String, Object> listeners = new HashMap<>();
@Override
protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
bean = Mockito.spy(bean);
this.listeners.put(kafkaListener.id(), bean);
super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
}
@SuppressWarnings("unchecked")
public <T> T getSpy(String id) {
return (T) this.listeners.get(id);
}
}
...
@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
return new KafkaListenerTestHarness();
}
Затем в целевом тестовом примере я использую его так:
@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");
verify(listener, times(2)).listen1("foo");
Пожалуйста, найдите ОБНОВЛЕНИЕ в моем ответе.
Извините, я немного запутался ... Итак, у меня есть это: @Autowired private TestConfig.KafkaListenerTestHarness harness; и это: @Autowired private ReceiveMessageRetrievedEventHandler receiver; Второй - мой потребитель с методом слушателя. Как мне использовать здесь ваш код? И параметр foo я тоже не понимаю ... Я благодарен за помощь, извините, у меня с этим возникли трудности ...
Обратите внимание, как я храню spy - в KafkaListenerTestHarness .listenersMap. И только так можно получить доступ к spy. foo - это id() на @KafkaListener. Посмотрите, что я получаю за ключ карты в KafkaListenerTestHarness.
Думаю, я понял ... Но теперь у меня ошибка Another endpoint is already registered with id, от которой не могу избавиться ...
Я использую этот идентификатор только в одном месте: @KafkaListener( topic ... id = "ReceiveMessageProductRetrievedEventHandlerID") public void productRetrievedEvent(final ProductTransaction message) { И затем в моем тесте: @ContextConfiguration(classes = {TestConfig.class ... ReceiveMessageProductRetrievedEventHandler.class})@Autowired private TestConfig.KafkaListenerTestHarness harness; Ошибка возникает, если запускается эта строка на проводке: super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName); Если я прокомментирую ее, она исчезнет.
Это не. Wanted but not invoked. здесь: verify(listener, times(1)).productRetrievedEvent(anyObject());. И у меня была точка останова на слушателе, он действительно был вызван. Я прочту весь этот код еще раз, посмотрю, не упустил ли я что-нибудь
Я понимаю, что нигде не использую ваш kafkaListenerAnnotationBeanPostProcessor(), он просто там ... Верно?
Я не понимаю вашего последнего вопроса. kafkaListenerAnnotationBeanPostProcessor() - это bean-компонент, который просто нужно там объявить.
Если это не сработает, вам следует отказаться от такого решения и пойти другим путем. Также я уже просил вас поднять соответствующий вопрос GH по этому поводу, и мы обязательно сделаем что-нибудь в Framework для этих вариантов использования.
Есть
KafkaListenerAnnotationBeanPostProcessorдля обработки всех этих методов@KafkaListener. Вам просто нужно следовать логикеRabbitListenerTestHarness. Позже позвольте мне показать кое-что простое в своем ответе!