Правильный способ чтения сообщений из темы Kafka и последующего закрытия

Я создаю простой API-интерфейс node.js, используя экспресс-узел и узел kafka, который возвращает непрочитанные сообщения из запрошенной темы Kafka и группы потребителей при получении HTTP-запроса, а затем закрывает соединение. Мне не нужно или я не хочу, чтобы потребитель продолжал ждать новых сообщений.

В kafka-node, как правильно проверить, достигнут ли конец темы, и если да, закрыть соединение с брокером и выйти из приложения, чтобы предотвратить чтение новых сообщений?

Вот мой потребитель.js. Это почти то же самое, что и пример, приведенный в документации kafka-node.

"use strict";

const kafka = require("kafka-node");

let topicName = "testTopic-01",
  groupName = "testGroup-01",
  consumerOptions = {
    kafkaHost: "localhost: 9092",
    groupId: groupName,
    sessionTimeout: 15000,
    protocol: ["roundrobin"],
    fromOffset: "earliest",
    encoding: "utf8"
  };

const consumerGroup = new kafka.ConsumerGroup(consumerOptions, topicName);

consumerGroup.on("message", message => {
  console.info(`Message: ${message.value}`);
});
consumerGroup.on("error", error => {
  console.error(error);
});

console.info(`Consumer started on topic ${topicName} on group ${groupName}`);
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
748
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете получить текущее смещение тематический раздел, используя #Компенсировать. Сравнив полученное таким образом смещение назначенного раздела темы, вы узнаете, какое последнее сообщение в соответствующем разделе темы.

Имейте в виду, что если у вас есть несколько потребителей параллельно, вы должны отслеживать раздел темы, к которому был назначен ваш потребитель в группе потребителей (#fetchCommits).

Думаю, я мог бы решить эту проблему, проверив последнее смещение для каждого раздела в теме и сравнив его с ltest зафиксированное смещение в группе потребителей. Пока последнее смещение темы выше, чем последнее зафиксированное смещение, появляются новые сообщения. Я попробую это. Спасибо!

Kilipukki 31.05.2019 22:07

Другие вопросы по теме