Невозможно провести модульное тестирование аннотированного метода @KafkaListener

Я пытаюсь провести модульное тестирование потребительского класса kafka весной. Я хочу знать, что если сообщение kafka отправлено в эту тему, метод прослушивателя был вызван правильно. Мой потребительский класс аннотируется следующим образом:

@KafkaListener(topics = "${kafka.topics.myTopic}")
public void myKafkaMessageEvent(final String message) { ...

Если я @Autowire являюсь потребителем, когда я отправляю сообщение kafka, метод слушателя вызывается правильно, но я не могу утверждать, что метод был вызван, потому что класс не является имитацией.

Если я издеваюсь над потребителем, когда я отправляю сообщение kafka, метод слушателя вообще не вызывается. Я могу вызвать метод напрямую и утверждать, что он сработал, но это не делает того, что я хочу, а именно проверки, вызывается ли метод, когда я отправляю сообщение kafka в эту тему.

На данный момент я прибег к установке счетчика внутри потребителя и увеличиваю его каждый раз, когда вызывается метод слушателя, а затем проверяю, что его значение было изменено. Создание переменной только для тестирования кажется мне ужасным решением.

Может быть, есть способ заставить издеваемого потребителя получать сообщения kafka? Или каким-то другим способом утверждать, что был вызван немодельный метод прослушивателя потребителя?

2
0
2 629
1

Ответы 1

Похоже, вы запрашиваете нечто похожее на то, что есть в Spring AMQP Testing Framework: https://docs.spring.io/spring-amqp/docs/2.0.3.RELEASE/reference/html/_reference.html#test-harness

Итак, если вы не умеете использовать дополнительную переменную, вы можете позаимствовать этот решение и реализовать свою собственную «привязь».

Я думаю, что это должно быть хорошим дополнением к Framework, поэтому, пожалуйста, поднимите соответствующий проблема, и мы вместе сможем представить такой инструмент для общественности.

ОБНОВИТЬ

Итак, согласно Spring AMQP Foundation, я сделал это в своей тестовой конфигурации:

public static class KafkaListenerTestHarness extends KafkaListenerAnnotationBeanPostProcessor {

    private final Map<String, Object> listeners = new HashMap<>();

    @Override
    protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
            Object bean, Object adminTarget, String beanName) {

        bean = Mockito.spy(bean);

        this.listeners.put(kafkaListener.id(), bean);

        super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName);
    }

    @SuppressWarnings("unchecked")
    public <T> T getSpy(String id) {
        return (T) this.listeners.get(id);
    }

}

...

@SuppressWarnings("rawtypes")
@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public static KafkaListenerTestHarness kafkaListenerAnnotationBeanPostProcessor() {
    return new KafkaListenerTestHarness();
}

Затем в целевом тестовом примере я использую его так:

@Autowired
private KafkaListenerTestHarness harness;
...
Listener listener = this.harness.getSpy("foo");

verify(listener, times(2)).listen1("foo");

Есть KafkaListenerAnnotationBeanPostProcessor для обработки всех этих методов @KafkaListener. Вам просто нужно следовать логике RabbitListenerTestHarness. Позже позвольте мне показать кое-что простое в своем ответе!

Artem Bilan 07.05.2018 19:22

Пожалуйста, найдите ОБНОВЛЕНИЕ в моем ответе.

Artem Bilan 07.05.2018 19:55

Извините, я немного запутался ... Итак, у меня есть это: @Autowired private TestConfig.KafkaListenerTestHarness harness; и это: @Autowired private ReceiveMessageRetrievedEventHandler receiver; Второй - мой потребитель с методом слушателя. Как мне использовать здесь ваш код? И параметр foo я тоже не понимаю ... Я благодарен за помощь, извините, у меня с этим возникли трудности ...

Jorge Bonafé 07.05.2018 20:53

Обратите внимание, как я храню spy - в KafkaListenerTestHarness .listenersMap. И только так можно получить доступ к spy. foo - это id() на @KafkaListener. Посмотрите, что я получаю за ключ карты в KafkaListenerTestHarness.

Artem Bilan 07.05.2018 20:55

Думаю, я понял ... Но теперь у меня ошибка Another endpoint is already registered with id, от которой не могу избавиться ...

Jorge Bonafé 08.05.2018 21:25

Я использую этот идентификатор только в одном месте: @KafkaListener( topic ... id = "ReceiveMessageProductRetrievedEventHandlerID") public void productRetrievedEvent(final ProductTransaction message) { И затем в моем тесте: @ContextConfiguration(classes = {TestConfig.class ... ReceiveMessageProductRetrievedEventHandler.class})@Autowired private TestConfig.KafkaListenerTestHarness harness; Ошибка возникает, если запускается эта строка на проводке: super.processListener(endpoint, kafkaListener, bean, adminTarget, beanName); Если я прокомментирую ее, она исчезнет.

Jorge Bonafé 09.05.2018 15:02

Это не. Wanted but not invoked. здесь: verify(listener, times(1)).productRetrievedEvent(anyObject());. И у меня была точка останова на слушателе, он действительно был вызван. Я прочту весь этот код еще раз, посмотрю, не упустил ли я что-нибудь

Jorge Bonafé 09.05.2018 15:15

Я понимаю, что нигде не использую ваш kafkaListenerAnnotationBeanPostProcessor(), он просто там ... Верно?

Jorge Bonafé 09.05.2018 15:17

Я не понимаю вашего последнего вопроса. kafkaListenerAnnotationBeanPostProcessor() - это bean-компонент, который просто нужно там объявить.

Artem Bilan 09.05.2018 15:18

Если это не сработает, вам следует отказаться от такого решения и пойти другим путем. Также я уже просил вас поднять соответствующий вопрос GH по этому поводу, и мы обязательно сделаем что-нибудь в Framework для этих вариантов использования.

Artem Bilan 09.05.2018 15:21

Другие вопросы по теме