Я написал тестовый пример JUnit для тестирования кода в уроке «С конфигурацией Java» в документации Spring Kafka. (https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration). Единственная разница в том, что я использую в классе встроенный сервер Kafka вместо сервера localhost. Я использую Spring Boot 2.0.2 и его зависимость Spring-Kafka.
Во время выполнения этого тестового примера я вижу, что Потребитель не читает сообщение из темы, и проверка assertTrue завершается ошибкой. Других ошибок нет.
@RunWith(SpringRunner.class)
public class SpringConfigSendReceiveMessage {
public static final String DEMO_TOPIC = "demo_topic";
@Autowired
private Listener listener;
@Test
public void testSimple() throws Exception {
template.send(DEMO_TOPIC, 0, "foo");
template.flush();
assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}
@Autowired
private KafkaTemplate<Integer, String> template;
@Configuration
@EnableKafka
public static class Config {
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, DEMO_TOPIC);
}
@Bean
public ConsumerFactory<Integer, String> createConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
return factory;
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
}
class Listener {
public final CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = DEMO_TOPIC)
public void listen1(String foo) {
this.latch.countDown();
}
}
Я думаю, это потому, что @KafkaListener использует некоторую неправильную настройку / настройку по умолчанию при чтении из темы. Я не вижу ошибок в логах.
Это правильный пример модульного теста? Как мне найти объект, созданный для аннотации KafkaListener, и посмотреть, от какого брокера Kafka он потребляет? Любые комментарии будут полезны. Спасибо.




Сообщение отправляется до запуска потребителя.
По умолчанию новые потребители начинают потребление в конце темы.
Добавлять
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Поскольку это встроенный экземпляр кафки, все группы являются «новыми» (смещения еще не зафиксированы), поэтому они будут использовать с конца темы по умолчанию, если вы не установите для свойства значение «самый ранний» (или дождитесь, пока не будут назначены разделы перед отправкой сообщение).
но apache kafka для этого совершенно противоположен, не так ли? каждый новый потребитель новой группы потребляет с самого начала, а новый потребитель существующей группы потребляет с последнего зачета, верно? просто пытаюсь понять разницу рабочего механизма между ними @Gray Russel
Нет; это совсем не правильно, см. auto.offset.reset в документации kafka. По умолчанию это latest; свойство определяет, где группа потребителей начинает использовать, если она никогда не использовала тему / раздел. Я не совсем понимаю, что вы подразумеваете под «новым потребителем существующей группы». Группа потребителей либо никогда не использовала тему / раздел, либо потребляла. В первом случае положение определяется собственностью; в последнем случае он потребляет из последнего зафиксированного смещения.
Ответ @ gary-russell - лучшее решение. Другой способ решить эту проблему - отложить шаг отправки сообщения на некоторое время. Это позволит потребителю быть готовым. Следующее также является правильным решением.
Извлеченный урок. Для модульного тестирования потребителей Kafka необходимо либо использовать все сообщения в тестовом примере, либо убедиться, что Consumer готов до того, как Producer отправит сообщение.
@Test
public void testSimple() throws Exception {
Thread.sleep(1000);
template.send(DEMO_TOPIC, 0, "foo");
template.flush();
assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}
вы имеете в виду нового потребителя той же группы? или новый потребитель другой новой группы? @ Грей Рассел