KafkaConsumer не использует сообщения

Из того, что я прочитал до сих пор, потребитель должен правильно потреблять сообщения, но единственный вывод, который я получаю, - это «.».

public class KafkaConsumerSample {

public static void main(String []args) {
    runProducer();
    //runConsumer();
}

static void runProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<Long, String> producer = new KafkaProducer<Long, String>(props);

     for (int i = 0; i < 10; i++) {
         producer.send(new ProducerRecord<Long, String>("foo", "test message" + Integer.toString(i)));
     }
     producer.close();
}

static void runConsumer() {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props); 
    consumer.subscribe(Arrays.asList("foo"));

    while(true) {
        ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(10));
        System.out.println(".");
        for(ConsumerRecord<Long, String> record: records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

}

Журнал Kafka сообщает мне следующее:

Member consumer-1 in groupp test has failed, removing it from the group

Preparing to rebalance group test with old generation 21

Group test with generation 22 is now empty

Я не очень понимаю, что это значит. Что мне не хватает? Мне это кажется очень простым.

Обновить

Я хотел бы прочитать все смещения один раз, когда я запускаю потребителя. От смещения 0 до последнего смещения. Я пытался :

static void runConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
        KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props); 
        consumer.subscribe(Arrays.asList("foo"));


        while(true) {
            consumer.seekToBeginning(consumer.assignment());
            ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(5));

            System.out.println(".");
            for(ConsumerRecord<Long, String> record: records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

Просто установить AUTO_OFFSET_RESET_CONFIG на самое раннее, по-видимому, недостаточно. Я запустил потребитель один раз, и он напечатал все смещения от 0 до последнего, но теперь он больше не работает после повторного запуска? Не уверен, что правильно понимаю этот конфиг. Вот почему я добавил метод Consumer.seek(), но где именно его разместить? Если я помещу его в цикл while(true), как показано, он печатает все от смещения 0 до последнего, но не каждые 5 секунд, а без перерыва... постоянно спамит мою консоль.

но, согласно здесь, ваш идентификатор группы потребителей - group-id, откуда взялся этот groupp test

Deadpool 19.01.2019 23:42

Убедитесь, что вы установили для свойства ConsumerConfig.AUTO_OFFSET_RESET_CONFIG значение OffsetResetStrategy.EARLIEST.toString().toLowerCase(). Проверить: stackoverflow.com/questions/53867775/…

Bartosz Wardziński 19.01.2019 23:43

Возможно, вы захотите разделить код производителя и потребителя на два класса. В реальном сценарии они будут отдельными. Тогда у вас может быть один цикл while с производителем, а другой с потребителем, тогда вы будете видеть сообщения, потому что потребитель может начать с конца темы и ждать, пока производитель отправит сообщения.

OneCricketeer 20.01.2019 01:06

Спасибо. Это как-то сработало, но я не уверен, что полностью это понимаю. Я прочитал документы, и если я правильно понимаю, если смещение не найдено (как это вообще может произойти? Потому что я не смотрю на конкретное смещение, не так ли?), тогда он берет самое раннее, что именно? 0? что значит самый ранний? Также в чем разница между этой конфигурацией и Consumer.seek()? Что, если я хочу читать все смещения от 0 до последнего каждый раз, когда я потребляю? Я пытался сделать эту работу (см. редактирование в основном посте), но есть проблема.

M4V3N 20.01.2019 12:12

@cricket_007 обновил мой пост

M4V3N 20.01.2019 12:18

auto.offset.reset применяется только в том случае, если группа потребителей не существует. Самый ранний - это буквально то, что это означает. Чтение из самого старого неиспользованного смещения, которое может быть равно 0, но не всегда (смещение 0 может истечь). Если вы всегда хотите читать с начала, вам нужно явно перейти к началу - stackoverflow.com/questions/49723182/…, но учтите, что срок действия данных вашей темы может истечь, поэтому вы, возможно, захотите отключить это или не полагаться на то, что Kafka хранит эти данные навсегда

OneCricketeer 20.01.2019 16:05

@cricket_007 cricket_007 Думаю, я просто запутался, почему мой потребитель не будет получать никаких сообщений, когда метод seek() находится вне цикла while(true).

M4V3N 20.01.2019 17:05

ну интересно, по-видимому, сначала нужно назначить разделы. Поэтому сначала необходимо вызвать Consumer.poll()! Не читал это нигде, но полезно знать.

M4V3N 21.01.2019 20:21
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
8
213
0

Другие вопросы по теме