Я хотел бы обрабатывать данные от потребителя с помощью темы 1, а затем отправлять сообщения обратно в Кафку в тему 2.
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
Моя попытка:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.info(data);
});
});
producer.on('error', function (err) {})
});
Но Producer не может отправлять обработанные сообщения в Kafka. Ошибка, которую я получил
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit
Я использую модуль узла Kafka-node
Вам нужно переключить порядок прослушивателя готовности производителя и прослушивателя сообщений потребителя.
В противном случае вы настраиваете готовый прослушиватель для каждого потребляемого сообщения.
Например
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.info(data);
});
});
Я бы посоветовал взглянуть на эту библиотеку, если в основном обработка и пересылка в новые темы https://github.com/nodefluent/kafka-streams/