Из того, что я прочитал до сих пор, потребитель должен правильно потреблять сообщения, но единственный вывод, который я получаю, - это «.».
public class KafkaConsumerSample {
public static void main(String []args) {
runProducer();
//runConsumer();
}
static void runProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<Long, String> producer = new KafkaProducer<Long, String>(props);
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<Long, String>("foo", "test message" + Integer.toString(i)));
}
producer.close();
}
static void runConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Arrays.asList("foo"));
while(true) {
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(10));
System.out.println(".");
for(ConsumerRecord<Long, String> record: records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
Журнал Kafka сообщает мне следующее:
Member consumer-1 in groupp test has failed, removing it from the group
Preparing to rebalance group test with old generation 21
Group test with generation 22 is now empty
Я не очень понимаю, что это значит. Что мне не хватает? Мне это кажется очень простым.
Обновить
Я хотел бы прочитать все смещения один раз, когда я запускаю потребителя. От смещения 0 до последнего смещения. Я пытался :
static void runConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
KafkaConsumer<Long, String> consumer = new KafkaConsumer<Long, String>(props);
consumer.subscribe(Arrays.asList("foo"));
while(true) {
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<Long, String> records = consumer.poll(Duration.ofSeconds(5));
System.out.println(".");
for(ConsumerRecord<Long, String> record: records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
Просто установить AUTO_OFFSET_RESET_CONFIG на самое раннее, по-видимому, недостаточно. Я запустил потребитель один раз, и он напечатал все смещения от 0 до последнего, но теперь он больше не работает после повторного запуска? Не уверен, что правильно понимаю этот конфиг. Вот почему я добавил метод Consumer.seek(), но где именно его разместить? Если я помещу его в цикл while(true), как показано, он печатает все от смещения 0 до последнего, но не каждые 5 секунд, а без перерыва... постоянно спамит мою консоль.
Убедитесь, что вы установили для свойства ConsumerConfig.AUTO_OFFSET_RESET_CONFIG значение OffsetResetStrategy.EARLIEST.toString().toLowerCase(). Проверить: stackoverflow.com/questions/53867775/…
Возможно, вы захотите разделить код производителя и потребителя на два класса. В реальном сценарии они будут отдельными. Тогда у вас может быть один цикл while с производителем, а другой с потребителем, тогда вы будете видеть сообщения, потому что потребитель может начать с конца темы и ждать, пока производитель отправит сообщения.
Спасибо. Это как-то сработало, но я не уверен, что полностью это понимаю. Я прочитал документы, и если я правильно понимаю, если смещение не найдено (как это вообще может произойти? Потому что я не смотрю на конкретное смещение, не так ли?), тогда он берет самое раннее, что именно? 0? что значит самый ранний? Также в чем разница между этой конфигурацией и Consumer.seek()? Что, если я хочу читать все смещения от 0 до последнего каждый раз, когда я потребляю? Я пытался сделать эту работу (см. редактирование в основном посте), но есть проблема.
@cricket_007 обновил мой пост
auto.offset.reset применяется только в том случае, если группа потребителей не существует. Самый ранний - это буквально то, что это означает. Чтение из самого старого неиспользованного смещения, которое может быть равно 0, но не всегда (смещение 0 может истечь). Если вы всегда хотите читать с начала, вам нужно явно перейти к началу - stackoverflow.com/questions/49723182/…, но учтите, что срок действия данных вашей темы может истечь, поэтому вы, возможно, захотите отключить это или не полагаться на то, что Kafka хранит эти данные навсегда
@cricket_007 cricket_007 Думаю, я просто запутался, почему мой потребитель не будет получать никаких сообщений, когда метод seek() находится вне цикла while(true).
ну интересно, по-видимому, сначала нужно назначить разделы. Поэтому сначала необходимо вызвать Consumer.poll()! Не читал это нигде, но полезно знать.




но, согласно здесь, ваш идентификатор группы потребителей -
group-id, откуда взялся этотgroupp test