Невозможно отправлять большие сообщения в Kafka

Я хочу отправить большое сообщение от производителя в Kafka, поэтому я изменил свойства ниже.

Брокер (server.properties)

replica.fetch.max.bytes=317344026
message.max.bytes=317344026
max.message.bytes=317344026
max.request.size=317344026

Производитель (продюсер. Свойства)

max.request.size=3173440261

Потребитель (consumer.properties)

max.partition.fetch.bytes=327344026
fetch.message.max.bytes=317344026

Тем не менее, я получаю некоторую ошибку, показанную ниже, когда я использую python Popen и команду cli для kafta для запуска производителя.

Код:

def producer(topic_name, content):
    p = subprocess.Popen(['/opt/kafka/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh', '--broker-list', 'localhost:9092', '--topic', 'Hello-Kafka'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
    p.stdin.write(content)
    out, err = p.communicate()
    print out

Ошибка:

ERROR Error when sending message to topic Hello-Kafka with key: null, value: 1677562 bytes with error: The message is 1677588 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

И я получаю ошибку ниже, когда использую модуль python для kafka (https://github.com/dpkp/kafka-python)

Код:

def producer(topic_name, content):
    p = KafkaProducer(bootstrap_servers='localhost:9092')
    a = p.send(topic_name, content).get()
    print a    
    p.flush()
    p.close()

Ошибка:

kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError: The message is 217344026 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration

Одна вещь, которую я успешно пробовал, - это разделение контента на куски, но если у кого-то есть решение, позволяющее сделать это без разделения контента.

какую сериализацию вы используете?

AbhishekN 09.08.2018 18:43

Брокеры Kafka не предназначены для обработки сообщений размером 300 МБ. У вас будет низкая производительность, если у вас нет тонны свободной памяти и вы не будете экспертом в управлении памятью Linux / Java. Лучшая стратегия - разбить это. Тем не менее, в вашем примере вы не передали файл свойств производителя производителю консоли, поэтому вы не установили конфигурацию (по крайней мере, как написано).

dawsaw 10.08.2018 05:44

@AbhishekN, я отправляю его прямо в строке, когда читаю ее из файла.

Vatsal Jagani 10.08.2018 06:22

@dawsaw, что вы имеете в виду, что "вы не передали файл свойств производителя производителю консоли" - как я могу это сделать? Не могли бы вы дать какую-нибудь ссылку?

Vatsal Jagani 10.08.2018 06:25
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
4
3 268
2

Ответы 2

kafka-console-producer.sh

Вы не использовали файл Producer.properties при вызове kafka-console-producer.sh.
. Используйте флаг --producer.config.

КафкаПродюсер

Ваш KafkaProducer использует значения по умолчанию. При вызове необходимо установить max_request_size. См. KafkaProducer doc

KafkaProducer(bootstrap_servers='localhost:9092', max_request_size=3173440261)

Размер вашей строки действительно огромен, это не совсем сообщение, которое будет использоваться в системе на основе очередей, переосмыслите архитектуру своей платформы. Сказав, что вы можете попробовать конфигурации сжатия и посмотреть, помогут ли они.

Kafka Сжатие данных: Есть два способа сжатия данных на Kafka: на стороне производителя и на стороне брокера. У обоих есть плюсы и минусы, я обнаружил (и я думаю, что другие тоже рекомендуют), что сжатие на стороне производителя лучше, поскольку оно дает лучшую оптимизацию партии.

"compression.codec" = "2"
"compressed.topics" = "<your-topic-name>"

(0: без сжатия, 1: сжатие GZIP, 2: быстрое сжатие, 3: сжатие LZ4)

Далее читайте: Идеи сжатия

Другие вопросы по теме