@KafkaListener сContainerFactory не запускается в @SpringBootTest

Я пытаюсь написать интеграционный тест для своего потребителя Kafka, используя @SpringBootTest и Testcontainers для базовой инфраструктуры. Моя установка выглядит так:

@Component
@ConditionalOnProperty("outdoor.kafka.enableKafkaReading")
public class Consumer {

    @Value("${topic.name}
    public final String topic;
    @Value("${consumer.group}"
    public final String consumerGroup;

    @KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.consumerGroup}",
            containerFactory = "containerFactory")
    public void consume(String message) {
        LOGGER.info("Received message: {}", message);
    }
}
@Configuration
public class KafkaConsumerConfig {
    // I actually use custom deserializers for my own entities, using String here for example's sake
    @Bean
    public ConsumerFactory<String, String> consumerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaConsumerFactory<>(
                kafkaProperties.buildConsumerProperties(null), new StringDeserializer(), new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(CommonErrorHandler errorHandler,
                                                                                                                    ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(leaderboardStandingEventConsumerFactory);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

В моем тесте я хочу написать что-нибудь в теме с помощью KafkaTemplate, а затем убедиться, что прослушиватель был вызван. Так:

@SpringBootTest
@ContextConfiguration(initializers = TestContainersInitializer.class)
class ConsumerTest {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @SpyBean
    private Consumer consumer;

    @Test
    void consume() {
        Instant now = Instant.now();
        kafkaTemplate.send("topic-name", "1", "message");
        await()
            .pollInterval(Duration.ofSeconds(1))
            .atMost(5, SECONDS)
            .untilAsserted(() -> {
                verify(consumer).consume(any());
            });
   }
}

Мое сообщение написано правильно по теме. Однако прослушиватель не запускается для обработки сообщения. Я в этом уверен, потому что если я использую стандартный KafkaConsumer для проверки содержимого, оно пишется правильно. Вы хоть представляете, что здесь не так?

Поскольку изначально это более сложная настройка, я попробовал запустить и более простой вариант, описанный выше, но он все равно не работает.

Некоторые ссылки, с которыми я консультировался:

  1. https://testcontainers.com/guides/testing-spring-boot-kafka-listener-using-testcontainers/
  2. https://www.atomicjar.com/2023/06/testing-kafka-applications-with-testcontainers/

Также поднят вопрос по проекту Spring Boot: https://github.com/spring-projects/spring-boot/issues/41009

Мне не хватает той части, где вы установили outdoor.kafka.enableKafkaReading. Это в вашем application.yaml? Вы пробовали без @ConditionalOnProperty?

GeertPt 07.06.2024 10:29

@GeertPt да, я забыл добавить сюда строку, я определяю ее при запуске. Но даже если полностью исключить условное выражение, оно все равно не поражает слушателя.

Daniel Pop 07.06.2024 10:32

Кроме того, подключение выполнено правильно — например, введенные поля в Consumer имеют правильные значения. Единственное, что не запускается — это сам потребительский метод.

Daniel Pop 07.06.2024 10:37
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
0
3
63
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема заключалась в том, что мне не хватало конфигурации @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") в тестовом классе, а потребителю не хватало сообщений, которые я писал.

Другие вопросы по теме