Я хочу отправить большое сообщение от производителя в Kafka, поэтому я изменил свойства ниже.
Брокер (server.properties)
replica.fetch.max.bytes=317344026
message.max.bytes=317344026
max.message.bytes=317344026
max.request.size=317344026
Производитель (продюсер. Свойства)
max.request.size=3173440261
Потребитель (consumer.properties)
max.partition.fetch.bytes=327344026
fetch.message.max.bytes=317344026
Тем не менее, я получаю некоторую ошибку, показанную ниже, когда я использую python Popen и команду cli для kafta для запуска производителя.
Код:
def producer(topic_name, content):
p = subprocess.Popen(['/opt/kafka/kafka_2.11-0.9.0.0/bin/kafka-console-producer.sh', '--broker-list', 'localhost:9092', '--topic', 'Hello-Kafka'], stdout=subprocess.PIPE, stdin=subprocess.PIPE)
p.stdin.write(content)
out, err = p.communicate()
print out
Ошибка:
ERROR Error when sending message to topic Hello-Kafka with key: null, value: 1677562 bytes with error: The message is 1677588 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
И я получаю ошибку ниже, когда использую модуль python для kafka (https://github.com/dpkp/kafka-python)
Код:
def producer(topic_name, content):
p = KafkaProducer(bootstrap_servers='localhost:9092')
a = p.send(topic_name, content).get()
print a
p.flush()
p.close()
Ошибка:
kafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError: The message is 217344026 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration
Одна вещь, которую я успешно пробовал, - это разделение контента на куски, но если у кого-то есть решение, позволяющее сделать это без разделения контента.
Брокеры Kafka не предназначены для обработки сообщений размером 300 МБ. У вас будет низкая производительность, если у вас нет тонны свободной памяти и вы не будете экспертом в управлении памятью Linux / Java. Лучшая стратегия - разбить это. Тем не менее, в вашем примере вы не передали файл свойств производителя производителю консоли, поэтому вы не установили конфигурацию (по крайней мере, как написано).
@AbhishekN, я отправляю его прямо в строке, когда читаю ее из файла.
@dawsaw, что вы имеете в виду, что "вы не передали файл свойств производителя производителю консоли" - как я могу это сделать? Не могли бы вы дать какую-нибудь ссылку?
Вы не использовали файл Producer.properties при вызове kafka-console-producer.sh
.
.
Используйте флаг --producer.config
.
Ваш KafkaProducer использует значения по умолчанию. При вызове необходимо установить max_request_size
.
См. KafkaProducer doc
KafkaProducer(bootstrap_servers='localhost:9092', max_request_size=3173440261)
Размер вашей строки действительно огромен, это не совсем сообщение, которое будет использоваться в системе на основе очередей, переосмыслите архитектуру своей платформы. Сказав, что вы можете попробовать конфигурации сжатия и посмотреть, помогут ли они.
Kafka Сжатие данных: Есть два способа сжатия данных на Kafka: на стороне производителя и на стороне брокера. У обоих есть плюсы и минусы, я обнаружил (и я думаю, что другие тоже рекомендуют), что сжатие на стороне производителя лучше, поскольку оно дает лучшую оптимизацию партии.
"compression.codec" = "2"
"compressed.topics" = "<your-topic-name>"
(0: без сжатия, 1: сжатие GZIP, 2: быстрое сжатие, 3: сжатие LZ4)
Далее читайте: Идеи сжатия
какую сериализацию вы используете?