Я использовал RDKAFKA в php для параллельного запуска потребителя.
Но первый потребитель потребил все сообщение из темы, поэтому второй потребитель не получил сообщения темы.
Поэтому я использовал разные идентификаторы группы для разных потребителей, но все равно возникает такая же проблема.
Помогите, пожалуйста.
$
conf-> set ('group.id', 'myConsumerGroup');
$
conf-> set ('metadata.broker.list', '127.0.0.2');
$
topicConf = новый RdKafka \ TopicConf ();
$
topicConf-> set ('auto.offset.reset', 'наименьший');
$
conf-> setDefaultTopicConf ($
topicConf);
$
consumer1 = новый RdKafka \ KafkaConsumer ($
conf);
$
consumer1-> подписаться (['dbtest']);
while (true) {
$
j = 0;
$
message = $
consumer1-> потреблять (120 * 1000);
switch ($
message-> err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$
dataEx = json_decode ($
message-> полезная нагрузка, истина);
var_dump ($
data);
$
sql = "ВСТАВИТЬ В emp (имя, адрес электронной почты) ЗНАЧЕНИЯ ('" .$
dataEx [' имя ']. "', '". $
dataEx [' электронная почта ']. "')";
`$`servername = "localhost";
`$`username = "A";
`$`password = "ASD";
`$`dbname = "test";
`$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
if (`$`conn->connect_error) {
die("Connection failed: " . `$`conn->connect_error);
}
if (`$`conn->query(`$`sql) === TRUE) {
echo "New record created successfully to datbase ".`$`dbname."/n";
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn->error;
}
`$`conn->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
$j++;
}
эхо "здесь";
$
conf-> set ('group.id', 'myConsumerGroup1');
$
conf-> set ('metadata.broker.list', '127.0.0.2');
$
topicConf = новый RdKafka \ TopicConf ();
$
topicConf-> set ('auto.offset.reset', 'наименьший');
$
conf-> setDefaultTopicConf ($
topicConf);
$
consumer2 = новый RdKafka \ KafkaConsumer ($
conf);
эхо "сонали";
$
consumer2-> подписаться (['dbtest']);
while (true) {
$
message2 = $
consumer2-> потреблять (120 * 1000);
switch ($
message2-> err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$
dataEx = json_decode ($
message2-> payload, true);
`` $ sql = "ВСТАВИТЬ В emp (имя, адрес электронной почты) ЗНАЧЕНИЯ ('". $ dataEx [' имя ']. "', '". $
dataEx [' электронная почта ']. "')";
$ servername1 = "локальный хост";
$ username1 = «А»;
$ password1 = "ASD";
$ dbname1 = "test1";
$
conn1 = новый mysqli ($
servername1, $
username1, $
password1, $
dbname1);
if ($
conn1-> connect_error) {
die ("Ошибка подключения:". $
conn1-> connect_error);
}
if (`$`conn1->query(`$`sql) === TRUE) {
echo "New record created successfully ".`$`dbname1;
} else {
echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
}
`$`conn1->close();
echo "produced `$`j----------------------<br> ";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception(`$`message->errstr(), `$`message->err);
break;
}
`$`j++;
}
попробуйте добавить в свои вопросы код того, что вы пробовали до сих пор. это поможет людям помочь вам.
Изменено форматирование.
Пожалуйста, добавьте используемый здесь код.
нужен код для этого