Я создаю простой API-интерфейс node.js, используя экспресс-узел и узел kafka, который возвращает непрочитанные сообщения из запрошенной темы Kafka и группы потребителей при получении HTTP-запроса, а затем закрывает соединение. Мне не нужно или я не хочу, чтобы потребитель продолжал ждать новых сообщений.
В kafka-node, как правильно проверить, достигнут ли конец темы, и если да, закрыть соединение с брокером и выйти из приложения, чтобы предотвратить чтение новых сообщений?
Вот мой потребитель.js. Это почти то же самое, что и пример, приведенный в документации kafka-node.
"use strict";
const kafka = require("kafka-node");
let topicName = "testTopic-01",
groupName = "testGroup-01",
consumerOptions = {
kafkaHost: "localhost: 9092",
groupId: groupName,
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "earliest",
encoding: "utf8"
};
const consumerGroup = new kafka.ConsumerGroup(consumerOptions, topicName);
consumerGroup.on("message", message => {
console.info(`Message: ${message.value}`);
});
consumerGroup.on("error", error => {
console.error(error);
});
console.info(`Consumer started on topic ${topicName} on group ${groupName}`);
Вы можете получить текущее смещение тематический раздел, используя #Компенсировать. Сравнив полученное таким образом смещение назначенного раздела темы, вы узнаете, какое последнее сообщение в соответствующем разделе темы.
Имейте в виду, что если у вас есть несколько потребителей параллельно, вы должны отслеживать раздел темы, к которому был назначен ваш потребитель в группе потребителей (#fetchCommits).
Думаю, я мог бы решить эту проблему, проверив последнее смещение для каждого раздела в теме и сравнив его с ltest зафиксированное смещение в группе потребителей. Пока последнее смещение темы выше, чем последнее зафиксированное смещение, появляются новые сообщения. Я попробую это. Спасибо!