Я реализую конечную точку в spring-boot, которая при вызове будет делать дамп всех сообщений, находящихся в теме kafka (для тестирования).
Поведение, которое я ожидаю, заключается в том, что когда производитель пишет в тему «testTopic», а затем опрашивает потребитель, он должен прочитать только что созданное сообщение.
Поведение, которое я наблюдаю, заключается в том, что потребитель не может использовать созданное сообщение. Далее, если производитель выдает намного больше сообщений (скажем, 10-15), то потребитель сбрасывает их все за один раз. С этого момента, если производитель создает хотя бы одно сообщение, потребитель будет потреблять его, как и ожидалось.
Интуитивно я подумал, что настройка FETCH_MIN_BYTES_CONFIG
может быть как-то связана с этим — возможно, потребитель ждал, пока будет записано достаточно байтов. Но это уже установлено в 1 байт (по умолчанию) и не объясняет последующее успешное чтение отдельных сообщений.
Затем я подумал, что, возможно, я регистрирую потребителя до создания темы (слишком быстро вызывая конечную точку регистрации). Но я подтвердил от kafka-topics.sh
, что тема существует до регистрации потребителя.
Я заметил, что если я включаю автофиксацию смещений, то поведение иногда соответствует ожидаемому, а иногда нет. При ручной фиксации смещений (не показано в коде ниже) поведение очень странное, как описано выше.
Я также знаю, что продюсер работает должным образом, подтверждая это с помощью kafka-console-consumer
.
Также попытался увеличить время ожидания опроса до 1 секунды, но безуспешно.
// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;
public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}
public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}
Последовательность шагов такова: 1. запуск весеннего загрузочного приложения 2. тема kafka создана, могу подтвердить через консоль kafka 3. Регистрирую производителя и потребителя 4. Производитель производит, и я могу подтвердить это с помощью консоли kafka (другая группа потребителей). 5. Потребитель не потребляет
Я ожидаю, что результат будет следующим:
{
"data" : ["message1"]
}
Я получаю
{
"data" : []
}
Любые идеи, почему потребитель не потребляет записи, пока не будет написано пороговое количество сообщений?
РЕДАКТИРОВАТЬ_1:
Добавлено свойство props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
к потребителю без эффекта.
@Deadpool Потребитель не потребляет записи, пока не будет написано пороговое количество сообщений. Почему?
Откуда и как вы вызываете метод consume()
? public Map<String, List<String>> consume()
Покажите, как вы производите данные @jateeq
Поскольку вы вручную вызываете это testConsumer.poll(Duration.ofMillis(100))
. Вам нужно постоянно пул из темы. Как внутри бесконечного цикла while. например:
while (true) {
Map records = consume();
logger.debug("received records: {}", records);
}
Взгляните на эту ссылку: Кафка потребитель
вроде бы это было. Я не хотел запускать бесконечный цикл while, потому что потребитель предназначался для тестирования, поэтому я подумал, что опрошу один раз, и все. Я согласился на опрос для конечного числа итераций. Спасибо!
на самом деле, каков ваш вопрос? потребитель не потребляет записи? или что-нибудь еще?