Распределение сообщений kafka между потребителями

У меня очень простой вариант использования kafka, когда я сталкиваюсь с проблемой распределения сообщений между двумя разделами.

У меня 2 раздела по теме и у меня по 2 потребителя на каждый. Я вижу, что больше сообщений отправлено в определенный раздел, и только один потребитель получает сообщения для обработки, а другой, который подписан на раздел с меньшим количеством сообщений, бездействует вечно. оба потребителя имеют одинаковый идентификатор группы. Я не могу добиться горизонтального масштабирования с этой проблемой.

Ниже приведены ключевые конфиги, которые я помещаю.

kafka.session.timeout.ms=10000
kafka.auto.commit=false
kafka.maxpoll.interval.ms=50000
kafka.request.timeout.ms=15000
kafka.maxpoll.records=100

**PS:**имена взяты из моего проп-файла и не совсем совпадают с реальными именами свойств kafka. Мне нужен большой максимальный интервал опроса, чтобы обработать большой кусок за один раз. Есть предположения, что мне нужно добавить в конфиг или изменить его?

Вы используете ключ для ваших сообщений? Если да, то как распределяются эти ключи? Kafka использует хэш ключа для распространения. Если ключи не случайны или распределены неравномерно, потребительская нагрузка также будет искажена.

senseiwu 01.03.2019 21:41

Мои ключи представляют собой простые порядковые номера сообщений в строковом формате. Это справедливое разделение для потребителей. Проблема в том, что я недавно добавил второй раздел, и на нем не так много сообщений.

Tukaram Bhosale 02.03.2019 11:07
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
2
1 389
2

Ответы 2

Производители Кафка Продюсер: отправляют записи в раздел на основе ключа записи. Разделитель по умолчанию для Java использует хэш ключа записи для выбора раздела или использует стратегию циклического перебора, если запись не имеет ключа. Поэтому, чтобы быть более масштабируемым, всегда используйте уникальный ключ для сообщения.

Producers publish data to the topics of their choice. The producer is responsible for choosing which record to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the record). More on the use of partitioning in a second!

Если записи имеют одинаковый ключ, они будут заканчиваться в одном разделе

Вы также можете отправить запись в определенный раздел

public ProducerRecord(String topic,
          Integer partition,
          K key,
          V value)

Creates a record to be sent to a specified topic and partition

Пользователь может изменить разделитель по умолчанию (в соответствии со своими потребностями), перезаписав свойство производителя partitioner.class.

Bartosz Wardziński 04.03.2019 09:45

Да согласен. все контролируется только производителем. Я узнал о проблеме на стороне производителя, которая заключалась в том, что они создают сообщения с одним и тем же ключом, которые, в свою очередь, появляются в том же разделе на брокере.

Tukaram Bhosale 25.05.2019 20:01

Как упоминалось в другом ответе, кафка использует хэш ключа для определения раздела. Возможно, ваш ключ распределен неравномерно. В таких случаях вы можете определить свою собственную стратегию выбора раздела по производителю при создании записей. Создайте собственный класс разделителей и реализуйте его метод разделения следующим образом.

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;

public class CustomPartitioner implements Partitioner {

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if ((keyBytes == null) || (!(key instanceof String)))
            throw new InvalidRecordException("We expect all messages to have a key");
        // Your logic to decide partition based on key
        return 0;// Here return thr partition decided based on key
    }

    public void close() {
    }

    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }
}

В конфигурации производителя добавьте следующее

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getCanonicalName());
property 

Спасибо за информацию. У меня нет контроля над выбором раздела на стороне производителя. Есть ли какие-либо настройки, которые вызовут ребаланс? В случае истечения максимального интервала опроса сообщение повторно доставляется другому потребителю, но я не хочу избегать дублирования.

Tukaram Bhosale 02.03.2019 11:04

Свойство enable.auto.commit используется для хранения фиксации смещения для раздела, всякий раз, когда происходит перебалансировка, потребитель начинает чтение из фиксации смещения.

Nawnit Sen 02.03.2019 12:50

Другие вопросы по теме