Я пытаюсь использовать класс RoundRobinPartitioner Kafka для равномерного распределения сообщений по всем разделам. Моя конфигурация темы Kafka выглядит следующим образом:
имя: мультисхемакафкатопикодд
количество разделов: 16
коэффициент репликации: 2
Скажем, если я создаю 100 сообщений, то в каждом разделе должно быть 6 или 7 сообщений. Но я получаю что-то похожее на это:
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0
Я подумал, что, может быть, я не произвожу достаточно сообщений, поэтому я попробовал с записями 1M и установил нечетное количество разделов:
Тема: мультисхемакафкатопикодд
количество разделов: 31
коэффициент репликации: 2
... и я получил это. На этот раз количество сообщений в каждом разделе распределено примерно поровну.
sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684
Я снова провел тот же тест, но уменьшил количество разделов до 8 и получил такой результат, в котором мы ясно видим, что в некоторых разделах около 15 КБ сообщений, а в других около 10 КБ:
multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599
Я что-то не так делаю или так и должно быть? Почему такое неравномерное распределение сообщений?
Если кто-нибудь может мне помочь, это было бы здорово. Спасибо.
Насколько я понимаю, разделитель работает хорошо. Но вы должны знать об оптимизации, сделанной производителем, чтобы максимизировать производительность:
Производитель не будет создавать каждое сообщение в другой раздел для каждого вызова отправки, так как это было бы излишним.
Round-Robin
гарантирует аналогичную раздачу, но работает пакетная рассылка. Это означает, что он будет буферизовать количество сообщений, предназначенных для раздела, на основе остатка (не модуля!) операции, выполненной в коде RoundRobinPartitioner
:
int part = Utils.toPositive(nextValue) % availablePartitions.size();
nextValue
— это AtomicInteger
, которое увеличивается на 1 для каждого раздела/вызова отправки. Таким образом, остаток всегда будет увеличиваться на единицу (циклически, например, с 4 разделами: 0-1-2-3-0-1-2-3-...
), при условии, что ни один раздел не будет объявлен недоступным во время процесса. Если это произойдет, цикл может выглядеть так 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...
(Счетчик номеров сообщений начинается с 0 - new AtomicInteger(0)
)
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
4%4 0
5%4 1
6%4 2
7%4 3
8%4 0
... ...
Когда создается 9-е сообщение, буфер для первого раздела заполняется (поскольку он уже содержит 3 сообщения) и, следовательно, готов к отправке в kafka. Если вы остановите процесс прямо здесь, 4 раздела будут выглядеть так:
Partition Offset
0 3
1 0
2 0
3 0
При создании 10-го сообщения буфер для второго раздела также будет готов к отправке из сети, и тема будет выглядеть так:
Partition Offset
0 3
1 3
2 0
3 0
В реальной жизни буфер обычно содержит большое количество сообщений (это тоже можно настроить). Допустим, например, хранится 1000 сообщений. Для того же сценария разделы будут выглядеть так:
Partition Offset
0 1000
1 1000
2 0
3 0
Следовательно, увеличивается «визуальная» разница между разделами. Чем больше размер партии/буфера, тем печальнее будет.
Это связано с характером самого потока partitioner
производителя: по умолчанию он не будет отправлять каждое сообщение отдельно, а сохраняет их, чтобы отправлять несколько сообщений при каждом вызове брокера, оптимизируя производительность системы.
Пакетная обработка является одним из основных факторов эффективности, и при пакетной обработке производитель Kafka попытается накопить данные в памяти. и отправлять большие партии в одном запросе
Этот дисбаланс может быть более заметным, если производитель остановлен/запущен, поскольку он перезапустит механизм независимо от ранее выбранных разделов (поэтому он может начать отправку в тот же раздел, который был выбран непосредственно перед остановкой, тем самым увеличивая разницу с другими невыбранные разделы из последнего выполнения).
При новом выполнении все буферы будут пустыми, поэтому процесс будет перезапущен независимо от того, какие разделы получили больше всего.
Итак, вы останавливаете процесс здесь:
Partition Offset
0 1000
1 1000
2 0
3 0
Карта, содержащая счетчик количества сообщений для каждой темы, перезапускается, так как она не является частью брокера, , а класса Partitioner от производителя. Если производитель не закрыт должным образом и/или не сброшен, эти кэшированные сообщения также будут потеряны. Итак, в этом сценарии вы получаете повторение предыдущей логики:
MsgN % Partitions Partition
0%4 0
1%4 1
2%4 2
3%4 3
(...)
Что приведет к этому в определенный момент:
Partition Offset
0 2000
1 2000
2 0
3 0
Это дисбаланс, вызванный прерывистым выполнением процесса отправки, но он выходит за рамки RoundRobinPartitioner
, природа которого основана на непрерывном процессе (без остановок).
Вы можете проверить это поведение, проверив смещение каждого раздела при отправке сообщений: только когда выбранный раздел хранит n сообщений, следующий выбранный раздел получит свой пакет из n сообщений.
Примечание. Цифры, показанные в примерах, относятся к «идеальному» сценарию; В реальной жизни сообщения также могут быть отозваны, сжаты, неудачны, очищены независимо от размера буфера, недоступных разделов... что приводит к номерам смещения, таким как показано в вашем вопросе.
Последний пример со сценарием сброса:
Partition Offset
0 1000
1 1000
2 0
3 0
Процесс остановлен, но производитель правильно закрыт и сбрасывает свои сообщения, поэтому тема выглядит так:
Partition Offset
0 1997
1 1996
2 999
3 998
Процесс перезапускается. После сброса буфера первого раздела это будет выглядеть так:
Partition Offset
0 2997
1 1996
2 999
3 998
Следовательно, увеличивается путаница в отношении «капитала» механизма. Но это не его вина, поскольку в карте, счетчике и буферах разделителя нет постоянства. Если вы позволите процессу выполняться в течение нескольких дней без остановки, вы обнаружите, что он действительно уравновешивает сообщения «почти равным» образом.
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster)
{
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
/*remainder calculus in order to select next partition*/
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
private int nextValue(String topic)
{
/*Counter of num messages sent. topicCounterMap is part of the producer
process, hence not persisted by default.
It will start by 0 for every topic with each new launch*/
AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
return new AtomicInteger(0); });
return counter.getAndIncrement();
}
Он не только отвечает на вопрос, но и предоставляет полезную информацию учащимся!
Я тоже столкнулся с этой проблемой. посмотрите на ответ ниже. stackoverflow.com/a/68335642/6552425