У меня есть вопрос об управлении несколькими CG, созданными тремя группами потребителей, у каждой CG есть собственная служба kafka, идентификатор группы и тема.
теперь я получаю сообщения, как и ожидалось, но мне интересно, можно ли создать следующий сценарий:
создать три группы потребителей, но получать сообщения только от одной, поставить другие на паузу / удерживать на данный момент, если его служба kafka упадет, потреблять сообщения от следующей группы потребителей, и то же самое с третьей.
Вот пример моего кода:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){ //3
const options = {
groupId: config.kafka_service[i]['groupId'],
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.info(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
if (i > 0){
//pause consumers exept FIRST
customConsumerGroupName.pause();
}
customConsumerGroupName.on('message', (message) => {
console.info(message);
});
customConsumerGroupName.on('error', (error) => {
console.info('consumer group error: ', error);
//HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
//MAYBE consumerGroup.resume(); ???
});
}
}
надеется, что это звучит понятно, спасибо :)
вы это сделали, и вы, и @Moonwalkr говорили об одной CG для обработки этого сценария, но я не видел ни одного примера или руководства по настройке разных центров обработки данных в одной CG. какие-нибудь вести, пожалуйста?
Чтобы настроить отдельных потребителей на одну и ту же CG (в разных центрах обработки данных), вы должны использовать параметр конфигурации groupId. У вас установлено значение 'config.kafka_service [i] [' groupId ']'. Где бы вы ни создавали своего потребителя, вы просто должны убедиться, что у него один и тот же groupId. Я не знаком с пакетом Node kafka, но я бы сказал, что вам следует придерживаться создания обычных потребителей и назначать каждому один и тот же идентификатор группы. Таким образом, отдельные потребители могут быть частью одной группы независимо от того, на каком сервере они размещены.
Я только что прочитал об api узла kafka, а название ConsumerGroup несколько, к сожалению, несколько. Это потребитель kafka, а параметр groupId контролирует «фактическую» группу потребителей kafka. Таким образом, ваш код должен работать до тех пор, пока опция groupId для каждой «новой ConsumerGroup» одинакова. Таким образом, все группы ConsumerGroups будут частью фактической группы потребителей Kafka, читающей из одной и той же темы и гарантирующей, что сообщения не попадут в аварийные потребители (экземпляры ConsumerGroup на языке Node).





Группы потребителей решают два основных сценария:
1. Масштабирование Вы можете увеличить количество потребителей в группе, чтобы обрабатывать увеличивающуюся скорость сообщений, создаваемых в теме (ах), которые использует группа (масштабирование)
2. Восстановление после сбоя Имея группу потребителей, читающих одну и ту же тему (и), они автоматически справятся с ситуацией, когда один или несколько потребителей откажутся.
Таким образом, вместо того, чтобы иметь «резервные» группы потребителей, где вы должны сами решать, какие из них активны, вы просто полагаетесь на встроенное аварийное переключение Kafka. Потребители могут работать в нескольких разных контейнерах (даже в разных центрах обработки данных), и Kafka автоматически гарантирует, что сообщения доставляются отдельным потребителям, независимо от того, где они находятся и сколько из них работает в любой момент времени.
если вы запустите 3 экземпляра одного и того же файла узла (который имеет consumerGroup), каждый из них получил одни и те же данные, фактически создавая дубликаты. Предполагается, что только один потребитель сможет обрабатывать данные, если он принадлежит к тому же groupId.
Похоже, что путаница возникает из-за названия ConsumerGroup пакета Node. В терминах Kafka группа потребителей контролируется исключительно groupId, используемым каждым потребителем. Потребители с одинаковым groupId не будут получать повторяющиеся сообщения, каждое сообщение темы читается только одним потребителем. Если потребитель выходит из строя, kafka обнаруживает это и передает разделы отдельному потребителю.
Узел ConsumerGroup - это просто еще один потребитель Kafka (новый Consumer с группами, управляемыми Kafka, а не zookeeper в Kafka> 0.9).
Таким образом, способ использования группы потребителей kafka с помощью Node ConsumerGroup будет следующим:
function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){ //3
const options = {
groupId: 'SOME_GROUP_NAME',
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.info(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
customConsumerGroupName.on('message', (message) => {
console.info(message);
});
customConsumerGroupName.on('error', (error) => {
console.info('consumer group error: ', error);
//Error handling logic here, restart the consumer that failed perhaps?
//Depends on how you want to managed failed consumers.
});
}
}
Каждый экземпляр Nodes ConsumerGroup будет членом группы SOME_GROUP_NAME, а любые другие потребители, созданные с тем же groupId, также будут действовать как члены той же группы потребителей kafka, независимо от сервера и т. д.
Большое спасибо, хорошие моменты, я немного обработал ошибки, в основном из-за сбоя брокера и тайм-аута, как вы упомянули. следующий потребитель занимает одно место. ваше здоровье!
Похоже, вы пытаетесь использовать отдельные группы потребителей в качестве средства защиты от сбоев, но я считаю, что одна группа потребителей должна делать то, что вы хотите. В каждой группе должно быть несколько потребителей, и если один из них откажет, другой в группе продолжит работу с того места, где остановился. Если только я не понял вашу проблему.