Получил готовый сервер Kafka со следующим скриптом
#!/usr/bin/perl
use Net::Kafka::Producer;
use AnyEvent;
my $condvar = AnyEvent->condvar;
my $producer = Net::Kafka::Producer->new(
'bootstrap.servers' => 'localhost:9092'
);
for (my $index = 1;;$index++) {
my $msg = "message: " . $index;
$producer->produce(
payload => $msg,
topic => "tracked-coords"
)->then(sub {
my $delivery_report = shift;
$condvar->send;
print "Message successfully delivered with offset " . $delivery_report->{offset};
}, sub {
my $error = shift;
$condvar->send;
die "Unable to produce a message: " . $error->{error} . ", code: " . $error->{code};
});
}
Почему сервер Kafka останавливается на 100 000 сообщений?
РЕДАКТИРОВАТЬ
Сервер перестает сообщать о получении сообщений. Также потребитель перестает получать сообщения
РЕДАКТИРОВАТЬ
Сервер Kafka регистрирует это (в конце)
message: 99998
message: 99999
message: 100000
[2022-03-21 14:43:30,597] INFO [ProducerStateManager partition=tracked-coords-0] Wrote producer snapshot at offset 500000 with 0 producer ids in 15 ms. (kafka.log.ProducerStateManager)
[2022-03-21 14:43:30,598] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Rolled new log segment at offset 500000 in 18 ms. (kafka.log.Log)
[2022-03-21 14:43:30,599] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) due to retention time 2000ms breach based on the largest record timestamp in the segment (kafka.log.Log)
[2022-03-21 14:43:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Incremented log start offset to 500000 due to segment deletion (kafka.log.Log)
[2022-03-21 14:44:30,610] INFO [Log partition=tracked-coords-0, dir=/tmp/kafka-logs] Deleting segment files LogSegment(baseOffset=400000, size=2191596, lastModifiedTime=1647873685289, largestRecordTimestamp=Some(1647873685290)) (kafka.log.Log$)
[2022-03-21 14:44:30,612] INFO Deleted log /tmp/kafka-logs/tracked-coords-0/00000000000000400000.log.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted offset index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.index.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,612] INFO Deleted time index /tmp/kafka-logs/tracked-coords-0/00000000000000400000.timeindex.deleted. (kafka.log.LogSegment)
[2022-03-21 14:44:30,613] INFO Deleted producer state snapshot /tmp/kafka-logs/tracked-coords-0/00000000000000400000.snapshot.deleted (kafka.log.SnapshotFile)
Вот код для потребителя
#!/usr/bin/perl
use feature qw( say );
use Net::Kafka::Consumer;
use AnyEvent;
use Data::Dumper;
use JSON;
my $consumer = Net::Kafka::Consumer->new(
'bootstrap.servers' => 'localhost:9092',
'group.id' => 'mock_data',
'enable.auto.commit' => 'true',
);
$consumer->subscribe( [ "tracked-coords"] );
while (1) {
my $msg = $consumer->poll(1000);
if ($msg) {
$consumer->commit(); #_message(0, $msg);
say "=================================================================== = ";
if ( $msg->err ) {
say "Error: ", Net::Kafka::Error::to_string($err);
} else {
say $msg->payload;
}
}
}
И потребитель останавливается на 100 тыс.
У производителей ограничен размер партии. Сам брокер не должен останавливаться
"Сервер перестал сообщать"... Где? Какие логи вы смотрите?
Почему ваши журналы perl смешаны с журналами брокера? Я не знаком с perl, но вы можете попробовать тот же цикл с kcat или встроенными bash-скриптами производителя.
@OneCricketeer - они не смешаны - Kafka запускается в другом терминале. Любые хорошие указатели для использования kcat
1) Я предлагаю не хранить ваши данные kafka в /tmp 2) начать со встроенного скрипта kafka-producer-perf-test.sh
с более чем 100 тыс. записей
@OneCricketeer - Спасибо - я попробую
Поскольку вы используете Net::Kafka, который использует библиотеку librdkafka, возможно, это параметр очередь.буферизация.макс.сообщений. По умолчанию это 100 000. Значение:
Maximum number of messages allowed on the producer queue. This queue is shared by all topics and partitions. See: https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html
Попробуйте установить это на меньшее число, возможно, в вашем вызове Net::Kafka::Producer->new(), чтобы увидеть, отключится ли он раньше. Эта настройка поддерживает диапазон 1-10 м. Как ни странно, я не вижу этого в настройках сервера Kafka, так что думаю, это только настройка драйвера edenhill.
Что именно вы подразумеваете под словом «стоп»? Какое сообщение об ошибке отображается?