Использование rdkafka назначение нескольких идентификаторов группы нескольким потребителям

Я использовал RDKAFKA в php для параллельного запуска потребителя.

Но первый потребитель потребил все сообщение из темы, поэтому второй потребитель не получил сообщения темы.

Поэтому я использовал разные идентификаторы группы для разных потребителей, но все равно возникает такая же проблема.

Помогите, пожалуйста. $conf-> set ('group.id', 'myConsumerGroup'); $conf-> set ('metadata.broker.list', '127.0.0.2'); $topicConf = новый RdKafka \ TopicConf (); $topicConf-> set ('auto.offset.reset', 'наименьший'); $conf-> setDefaultTopicConf ($topicConf); $consumer1 = новый RdKafka \ KafkaConsumer ($conf); $consumer1-> подписаться (['dbtest']); while (true) { $j = 0; $message = $consumer1-> потреблять (120 * 1000);

switch ($message-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $dataEx = json_decode ($message-> полезная нагрузка, истина); var_dump ($data); $sql = "ВСТАВИТЬ В emp (имя, адрес электронной почты) ЗНАЧЕНИЯ ('" .$dataEx [' имя ']. "', '". $dataEx [' электронная почта ']. "')";

    `$`servername = "localhost";
    `$`username = "A";
    `$`password = "ASD";
    `$`dbname = "test";
    `$`conn = new mysqli(`$`servername, `$`username, `$`password, `$`dbname);
    if (`$`conn->connect_error) {
        die("Connection failed: " . `$`conn->connect_error);
    }

    if (`$`conn->query(`$`sql) === TRUE) {
        echo "New record created successfully to datbase ".`$`dbname."/n";
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn->error;
    }
    `$`conn->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
$j++;

} эхо "здесь"; $conf-> set ('group.id', 'myConsumerGroup1'); $conf-> set ('metadata.broker.list', '127.0.0.2'); $topicConf = новый RdKafka \ TopicConf (); $topicConf-> set ('auto.offset.reset', 'наименьший'); $conf-> setDefaultTopicConf ($topicConf);

$consumer2 = новый RdKafka \ KafkaConsumer ($conf); эхо "сонали"; $consumer2-> подписаться (['dbtest']); while (true) { $message2 = $consumer2-> потреблять (120 * 1000); switch ($message2-> err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $dataEx = json_decode ($message2-> payload, true); `` $ sql = "ВСТАВИТЬ В emp (имя, адрес электронной почты) ЗНАЧЕНИЯ ('". $ dataEx [' имя ']. "', '". $dataEx [' электронная почта ']. "')"; $ servername1 = "локальный хост"; $ username1 = «А»; $ password1 = "ASD"; $ dbname1 = "test1"; $conn1 = новый mysqli ($servername1, $username1, $password1, $dbname1); if ($conn1-> connect_error) { die ("Ошибка подключения:". $conn1-> connect_error); }

    if (`$`conn1->query(`$`sql) === TRUE) {
        echo "New record created successfully ".`$`dbname1;
    } else {
        echo "Error: " . `$`sql . "<br>" . `$`conn1->error;
    }
    `$`conn1->close();
    echo "produced `$`j----------------------<br> ";
    break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
    echo "No more messages; will wait for more\n";
    break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
    echo "Timed out\n";
    break;
default:
    throw new \Exception(`$`message->errstr(), `$`message->err);
    break;
}
`$`j++;

}

нужен код для этого

Sonali Thakkar 10.09.2018 07:19

попробуйте добавить в свои вопросы код того, что вы пробовали до сих пор. это поможет людям помочь вам.

Zeeshan Adil 10.09.2018 07:38

Изменено форматирование.

Avishek Bhattacharya 10.09.2018 08:13

Пожалуйста, добавьте используемый здесь код.

Avishek Bhattacharya 10.09.2018 08:14
0
4
240
0

Другие вопросы по теме