Разделитель Kafka RoundRobin не распространяет сообщения на все разделы

Я пытаюсь использовать класс RoundRobinPartitioner Kafka для равномерного распределения сообщений по всем разделам. Моя конфигурация темы Kafka выглядит следующим образом:

имя: мультисхемакафкатопикодд

количество разделов: 16

коэффициент репликации: 2

Скажем, если я создаю 100 сообщений, то в каждом разделе должно быть 6 или 7 сообщений. Но я получаю что-то похожее на это:

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:26
multischemakafkatopicodd:5:0
multischemakafkatopicodd:10:24
multischemakafkatopicodd:15:0
multischemakafkatopicodd:13:0
multischemakafkatopicodd:8:26
multischemakafkatopicodd:2:26
multischemakafkatopicodd:12:24
multischemakafkatopicodd:14:24
multischemakafkatopicodd:9:0
multischemakafkatopicodd:11:0
multischemakafkatopicodd:4:26
multischemakafkatopicodd:1:0
multischemakafkatopicodd:6:24
multischemakafkatopicodd:7:0
multischemakafkatopicodd:3:0

Я подумал, что, может быть, я не произвожу достаточно сообщений, поэтому я попробовал с записями 1M и установил нечетное количество разделов:

Тема: мультисхемакафкатопикодд

количество разделов: 31

коэффициент репликации: 2

... и я получил это. На этот раз количество сообщений в каждом разделе распределено примерно поровну.

sh /usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.0.55.211:9092 --topic multischemakafkatopicodd --time -1
multischemakafkatopicodd:0:33845
multischemakafkatopicodd:5:34388
multischemakafkatopicodd:10:33837
multischemakafkatopicodd:20:33819
multischemakafkatopicodd:15:33890
multischemakafkatopicodd:25:34414
multischemakafkatopicodd:30:33862
multischemakafkatopicodd:26:34066
multischemakafkatopicodd:9:34088
multischemakafkatopicodd:11:34124
multischemakafkatopicodd:16:33802
multischemakafkatopicodd:4:34061
multischemakafkatopicodd:17:34977
multischemakafkatopicodd:3:34084
multischemakafkatopicodd:24:33849
multischemakafkatopicodd:23:34111
multischemakafkatopicodd:13:34062
multischemakafkatopicodd:28:33876
multischemakafkatopicodd:18:34098
multischemakafkatopicodd:22:34058
multischemakafkatopicodd:8:34079
multischemakafkatopicodd:2:33839
multischemakafkatopicodd:12:34075
multischemakafkatopicodd:29:34132
multischemakafkatopicodd:19:33924
multischemakafkatopicodd:14:34109
multischemakafkatopicodd:1:34088
multischemakafkatopicodd:6:33832
multischemakafkatopicodd:7:34080
multischemakafkatopicodd:27:34188
multischemakafkatopicodd:21:34684

Я снова провел тот же тест, но уменьшил количество разделов до 8 и получил такой результат, в котором мы ясно видим, что в некоторых разделах около 15 КБ сообщений, а в других около 10 КБ:

multischemakafkatopicodd:0:155927
multischemakafkatopicodd:5:105351
multischemakafkatopicodd:1:107382
multischemakafkatopicodd:4:160533
multischemakafkatopicodd:6:158007
multischemakafkatopicodd:7:105608
multischemakafkatopicodd:2:157934
multischemakafkatopicodd:3:105599

Я что-то не так делаю или так и должно быть? Почему такое неравномерное распределение сообщений?

Если кто-нибудь может мне помочь, это было бы здорово. Спасибо.

Я тоже столкнулся с этой проблемой. посмотрите на ответ ниже. stackoverflow.com/a/68335642/6552425

yousef 11.07.2021 13:15
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
1
2 292
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Насколько я понимаю, разделитель работает хорошо. Но вы должны знать об оптимизации, сделанной производителем, чтобы максимизировать производительность:

  • Производитель не будет создавать каждое сообщение в другой раздел для каждого вызова отправки, так как это было бы излишним.

  • Round-Robin гарантирует аналогичную раздачу, но работает пакетная рассылка. Это означает, что он будет буферизовать количество сообщений, предназначенных для раздела, на основе остатка (не модуля!) операции, выполненной в коде RoundRobinPartitioner:

     int part = Utils.toPositive(nextValue) % availablePartitions.size();
    

nextValue — это AtomicInteger, которое увеличивается на 1 для каждого раздела/вызова отправки. Таким образом, остаток всегда будет увеличиваться на единицу (циклически, например, с 4 разделами: 0-1-2-3-0-1-2-3-...), при условии, что ни один раздел не будет объявлен недоступным во время процесса. Если это произойдет, цикл может выглядеть так 0-1-2-(partition4fails)-0-1-2-(partition4OK)-3-0-...


Пример

  • Тема с 4 разделами
  • Буфер потока разделителя производителя для каждого раздела содержит 3 сообщения.

(Счетчик номеров сообщений начинается с 0 - new AtomicInteger(0))

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
        4%4                0
        5%4                1
        6%4                2 
        7%4                3
        8%4                0
        ...               ...

Когда создается 9-е сообщение, буфер для первого раздела заполняется (поскольку он уже содержит 3 сообщения) и, следовательно, готов к отправке в kafka. Если вы остановите процесс прямо здесь, 4 раздела будут выглядеть так:

    Partition    Offset
       0           3
       1           0
       2           0
       3           0

При создании 10-го сообщения буфер для второго раздела также будет готов к отправке из сети, и тема будет выглядеть так:

    Partition    Offset
       0           3
       1           3
       2           0
       3           0

В реальной жизни буфер обычно содержит большое количество сообщений (это тоже можно настроить). Допустим, например, хранится 1000 сообщений. Для того же сценария разделы будут выглядеть так:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

Следовательно, увеличивается «визуальная» разница между разделами. Чем больше размер партии/буфера, тем печальнее будет.

Это связано с характером самого потока partitioner производителя: по умолчанию он не будет отправлять каждое сообщение отдельно, а сохраняет их, чтобы отправлять несколько сообщений при каждом вызове брокера, оптимизируя производительность системы.

Пакетная обработка является одним из основных факторов эффективности, и при пакетной обработке производитель Kafka попытается накопить данные в памяти. и отправлять большие партии в одном запросе

Этот дисбаланс может быть более заметным, если производитель остановлен/запущен, поскольку он перезапустит механизм независимо от ранее выбранных разделов (поэтому он может начать отправку в тот же раздел, который был выбран непосредственно перед остановкой, тем самым увеличивая разницу с другими невыбранные разделы из последнего выполнения).

При новом выполнении все буферы будут пустыми, поэтому процесс будет перезапущен независимо от того, какие разделы получили больше всего.

Итак, вы останавливаете процесс здесь:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

Карта, содержащая счетчик количества сообщений для каждой темы, перезапускается, так как она не является частью брокера, , а класса Partitioner от производителя. Если производитель не закрыт должным образом и/или не сброшен, эти кэшированные сообщения также будут потеряны. Итак, в этом сценарии вы получаете повторение предыдущей логики:

    MsgN % Partitions   Partition
        0%4                0
        1%4                1
        2%4                2
        3%4                3
                 (...)

Что приведет к этому в определенный момент:

    Partition    Offset
       0          2000
       1          2000
       2           0
       3           0

Это дисбаланс, вызванный прерывистым выполнением процесса отправки, но он выходит за рамки RoundRobinPartitioner, природа которого основана на непрерывном процессе (без остановок).

Вы можете проверить это поведение, проверив смещение каждого раздела при отправке сообщений: только когда выбранный раздел хранит n сообщений, следующий выбранный раздел получит свой пакет из n сообщений.

Примечание. Цифры, показанные в примерах, относятся к «идеальному» сценарию; В реальной жизни сообщения также могут быть отозваны, сжаты, неудачны, очищены независимо от размера буфера, недоступных разделов... что приводит к номерам смещения, таким как показано в вашем вопросе.

Последний пример со сценарием сброса:

    Partition    Offset
       0           1000
       1           1000
       2           0
       3           0

Процесс остановлен, но производитель правильно закрыт и сбрасывает свои сообщения, поэтому тема выглядит так:

    Partition    Offset
       0           1997
       1           1996
       2           999
       3           998

Процесс перезапускается. После сброса буфера первого раздела это будет выглядеть так:

    Partition    Offset
       0           2997
       1           1996
       2           999
       3           998

Следовательно, увеличивается путаница в отношении «капитала» механизма. Но это не его вина, поскольку в карте, счетчике и буферах разделителя нет постоянства. Если вы позволите процессу выполняться в течение нескольких дней без остановки, вы обнаружите, что он действительно уравновешивает сообщения «почти равным» образом.


Соответствующие методы RoundRobinPartitioner:

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                     byte[] valueBytes, Cluster cluster) 
{
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions=cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) { 
        /*remainder calculus in order to select next partition*/
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

private int nextValue(String topic) 
{
  /*Counter of num messages sent. topicCounterMap is part of the producer 
   process, hence not persisted by default.
   It will start by 0 for every topic with each new launch*/
   AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
       return new AtomicInteger(0); });
   return counter.getAndIncrement();
}

Он не только отвечает на вопрос, но и предоставляет полезную информацию учащимся!

Arvind Kumar Avinash 09.03.2023 17:17

Другие вопросы по теме