@KafkaListener в случае модульного теста не потребляет из фабрики контейнеров

Я написал тестовый пример JUnit для тестирования кода в уроке «С конфигурацией Java» в документации Spring Kafka. (https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration). Единственная разница в том, что я использую в классе встроенный сервер Kafka вместо сервера localhost. Я использую Spring Boot 2.0.2 и его зависимость Spring-Kafka.

Во время выполнения этого тестового примера я вижу, что Потребитель не читает сообщение из темы, и проверка assertTrue завершается ошибкой. Других ошибок нет.

@RunWith(SpringRunner.class)
public class SpringConfigSendReceiveMessage {

    public static final String DEMO_TOPIC =  "demo_topic";
    @Autowired
    private Listener listener;

    @Test
    public void testSimple() throws Exception {
        template.send(DEMO_TOPIC, 0, "foo");
        template.flush();
        assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
    }

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaEmbedded kafkaEmbedded() {
            return new KafkaEmbedded(1, true, 1, DEMO_TOPIC);
        }

        @Bean
        public ConsumerFactory<Integer, String> createConsumerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return new DefaultKafkaConsumerFactory<>(props);
        }

        @Bean
        public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(createConsumerFactory());
            return factory;
        }

        @Bean
        public Listener listener() {
            return new Listener();
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(props);
        }

        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate() {
            return new KafkaTemplate<Integer, String>(producerFactory());
        }
    }       

}

class Listener {
    public final CountDownLatch latch = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = DEMO_TOPIC)
    public void listen1(String foo) {
        this.latch.countDown();
    }
}

Я думаю, это потому, что @KafkaListener использует некоторую неправильную настройку / настройку по умолчанию при чтении из темы. Я не вижу ошибок в логах.

Это правильный пример модульного теста? Как мне найти объект, созданный для аннотации KafkaListener, и посмотреть, от какого брокера Kafka он потребляет? Любые комментарии будут полезны. Спасибо.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
3
0
3 984
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Сообщение отправляется до запуска потребителя.

По умолчанию новые потребители начинают потребление в конце темы.

Добавлять

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

вы имеете в виду нового потребителя той же группы? или новый потребитель другой новой группы? @ Грей Рассел

Deadpool 07.07.2018 17:24

Поскольку это встроенный экземпляр кафки, все группы являются «новыми» (смещения еще не зафиксированы), поэтому они будут использовать с конца темы по умолчанию, если вы не установите для свойства значение «самый ранний» (или дождитесь, пока не будут назначены разделы перед отправкой сообщение).

Gary Russell 07.07.2018 17:27

но apache kafka для этого совершенно противоположен, не так ли? каждый новый потребитель новой группы потребляет с самого начала, а новый потребитель существующей группы потребляет с последнего зачета, верно? просто пытаюсь понять разницу рабочего механизма между ними @Gray Russel

Deadpool 07.07.2018 17:30

Нет; это совсем не правильно, см. auto.offset.reset в документации kafka. По умолчанию это latest; свойство определяет, где группа потребителей начинает использовать, если она никогда не использовала тему / раздел. Я не совсем понимаю, что вы подразумеваете под «новым потребителем существующей группы». Группа потребителей либо никогда не использовала тему / раздел, либо потребляла. В первом случае положение определяется собственностью; в последнем случае он потребляет из последнего зафиксированного смещения.

Gary Russell 07.07.2018 18:01

Ответ @ gary-russell - лучшее решение. Другой способ решить эту проблему - отложить шаг отправки сообщения на некоторое время. Это позволит потребителю быть готовым. Следующее также является правильным решением.

Извлеченный урок. Для модульного тестирования потребителей Kafka необходимо либо использовать все сообщения в тестовом примере, либо убедиться, что Consumer готов до того, как Producer отправит сообщение.

@Test
public void testSimple() throws Exception {
    Thread.sleep(1000);
    template.send(DEMO_TOPIC, 0, "foo");
    template.flush();
    assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}

Другие вопросы по теме