Я пытаюсь написать интеграционный тест для своего потребителя Kafka, используя @SpringBootTest и Testcontainers для базовой инфраструктуры. Моя установка выглядит так:
@Component
@ConditionalOnProperty("outdoor.kafka.enableKafkaReading")
public class Consumer {
@Value("${topic.name}
public final String topic;
@Value("${consumer.group}"
public final String consumerGroup;
@KafkaListener(topics = "#{__listener.topic}", groupId = "#{__listener.consumerGroup}",
containerFactory = "containerFactory")
public void consume(String message) {
LOGGER.info("Received message: {}", message);
}
}
@Configuration
public class KafkaConsumerConfig {
// I actually use custom deserializers for my own entities, using String here for example's sake
@Bean
public ConsumerFactory<String, String> consumerFactory(ObjectMapper objectMapper) {
return new DefaultKafkaConsumerFactory<>(
kafkaProperties.buildConsumerProperties(null), new StringDeserializer(), new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory(CommonErrorHandler errorHandler,
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(leaderboardStandingEventConsumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
В моем тесте я хочу написать что-нибудь в теме с помощью KafkaTemplate, а затем убедиться, что прослушиватель был вызван. Так:
@SpringBootTest
@ContextConfiguration(initializers = TestContainersInitializer.class)
class ConsumerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@SpyBean
private Consumer consumer;
@Test
void consume() {
Instant now = Instant.now();
kafkaTemplate.send("topic-name", "1", "message");
await()
.pollInterval(Duration.ofSeconds(1))
.atMost(5, SECONDS)
.untilAsserted(() -> {
verify(consumer).consume(any());
});
}
}
Мое сообщение написано правильно по теме. Однако прослушиватель не запускается для обработки сообщения. Я в этом уверен, потому что если я использую стандартный KafkaConsumer для проверки содержимого, оно пишется правильно. Вы хоть представляете, что здесь не так?
Поскольку изначально это более сложная настройка, я попробовал запустить и более простой вариант, описанный выше, но он все равно не работает.
Некоторые ссылки, с которыми я консультировался:
Также поднят вопрос по проекту Spring Boot: https://github.com/spring-projects/spring-boot/issues/41009
@GeertPt да, я забыл добавить сюда строку, я определяю ее при запуске. Но даже если полностью исключить условное выражение, оно все равно не поражает слушателя.
Кроме того, подключение выполнено правильно — например, введенные поля в Consumer имеют правильные значения. Единственное, что не запускается — это сам потребительский метод.




Проблема заключалась в том, что мне не хватало конфигурации @TestPropertySource(properties = "spring.kafka.consumer.auto-offset-reset=earliest") в тестовом классе, а потребителю не хватало сообщений, которые я писал.
Мне не хватает той части, где вы установили
outdoor.kafka.enableKafkaReading. Это в вашем application.yaml? Вы пробовали без @ConditionalOnProperty?