Kafka Consumer читает только после того, как было произведено «достаточно» данных

Я реализую конечную точку в spring-boot, которая при вызове будет делать дамп всех сообщений, находящихся в теме kafka (для тестирования).

Поведение, которое я ожидаю, заключается в том, что когда производитель пишет в тему «testTopic», а затем опрашивает потребитель, он должен прочитать только что созданное сообщение.

Поведение, которое я наблюдаю, заключается в том, что потребитель не может использовать созданное сообщение. Далее, если производитель выдает намного больше сообщений (скажем, 10-15), то потребитель сбрасывает их все за один раз. С этого момента, если производитель создает хотя бы одно сообщение, потребитель будет потреблять его, как и ожидалось.

Интуитивно я подумал, что настройка FETCH_MIN_BYTES_CONFIG может быть как-то связана с этим — возможно, потребитель ждал, пока будет записано достаточно байтов. Но это уже установлено в 1 байт (по умолчанию) и не объясняет последующее успешное чтение отдельных сообщений.

Затем я подумал, что, возможно, я регистрирую потребителя до создания темы (слишком быстро вызывая конечную точку регистрации). Но я подтвердил от kafka-topics.sh, что тема существует до регистрации потребителя.

Я заметил, что если я включаю автофиксацию смещений, то поведение иногда соответствует ожидаемому, а иногда нет. При ручной фиксации смещений (не показано в коде ниже) поведение очень странное, как описано выше.

Я также знаю, что продюсер работает должным образом, подтверждая это с помощью kafka-console-consumer.

Также попытался увеличить время ожидания опроса до 1 секунды, но безуспешно.

// Consumer
@Component
public class TestConsumer{
    private KafkaConsumer testConsumer = null;

    public void registerConsumer(final String consumerId) {
        if (consumer == null) {
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");

            testConsumer = new KafkaConsumer<String, String>(props);
            testConsumer.subscribe(Collections.singletonList("testTopic"));
        }
        else{
            logger.debug("Consumer already registered");
        }
    }

    public Map<String, List<String>> consume() {
        Map<String, List<String>> messages = new HashMap<>();
        if (testConsumer == null){
            logger.error("testConsumer was not instantiated");
            return null;
        }
        ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
        List<String> buffer = new ArrayList<>(); 
        for (ConsumerRecord<String, String> record: records){
            logger.debug(String.format("Consuming %s", record.value()));
            buffer.add(record.value());
        }
        messages.put("data", buffer);
        return messages;
    }
}

Последовательность шагов такова: 1. запуск весеннего загрузочного приложения 2. тема kafka создана, могу подтвердить через консоль kafka 3. Регистрирую производителя и потребителя 4. Производитель производит, и я могу подтвердить это с помощью консоли kafka (другая группа потребителей). 5. Потребитель не потребляет

Я ожидаю, что результат будет следующим:

{
    "data" : ["message1"]
}

Я получаю

{
    "data" : []
}

Любые идеи, почему потребитель не потребляет записи, пока не будет написано пороговое количество сообщений?

РЕДАКТИРОВАТЬ_1: Добавлено свойство props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); к потребителю без эффекта.

на самом деле, каков ваш вопрос? потребитель не потребляет записи? или что-нибудь еще?

Deadpool 17.05.2019 03:30

@Deadpool Потребитель не потребляет записи, пока не будет написано пороговое количество сообщений. Почему?

jateeq 17.05.2019 04:06

Откуда и как вы вызываете метод consume()? public Map<String, List<String>> consume()

MD Ruhul Amin 17.05.2019 04:25

Покажите, как вы производите данные @jateeq

Deadpool 17.05.2019 06:17
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
4
208
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку вы вручную вызываете это testConsumer.poll(Duration.ofMillis(100)). Вам нужно постоянно пул из темы. Как внутри бесконечного цикла while. например:

while (true) {
   Map records = consume();
   logger.debug("received records: {}", records);
}

Взгляните на эту ссылку: Кафка потребитель

вроде бы это было. Я не хотел запускать бесконечный цикл while, потому что потребитель предназначался для тестирования, поэтому я подумал, что опрошу один раз, и все. Я согласился на опрос для конечного числа итераций. Спасибо!

jateeq 18.05.2019 04:07

Другие вопросы по теме