Конфигурация большого сообщения Kafka

Я прочитал много тем о настройке, но все еще не понимаю.

когда я делаю:

../kafka_2.11-1.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my_topic --config max.message.bytes=10000000

это помогает

но в моем приложении все перепробовал

def producerSettings(system: ActorSystem): ProducerSettings[String, KafkaMessage] =
    ProducerSettings(system, new StringSerializer, new GenericSerde[KafkaMessage].serializer())
      .withBootstrapServers("localhost:9092")
      .withProperty("auto.create.topics.enable", "true")
      .withProperty("replica.fetch.max.bytes", maxSize)
      .withProperty("message.max.bytes", maxSize)

val producer: KafkaProducer[String, KafkaMessage] = configuration.producerSettings(context.system)
    .withBootstrapServers("localhost:" + config.getInt("kafka.port"))
    .withProperty("message.max.bytes", maxSize)
    .withProperty("max.request.size", maxSize)
    .createKafkaProducer()

когда начинаю чистую кафку - вижу в логах

INFO Created log for partition my_topic-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version ->
1.1-IV0, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms
-> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

a вы видите max.message.bytes -> 1000012 по-прежнему со значением по умолчанию и мне снова нужно большое исключение сообщения записи

что я делаю не так? Я хочу настроить свое приложение, а не консольную команду. Является ли это возможным?

Вам необходимо установить свойство у брокера, а не только у производителя ... Брокер настроен на прием только максимального размера

OneCricketeer 30.08.2018 15:37
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
1
555
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вероятно, это связано с тем, что ваша тема автоматически создается брокером. Когда тема создается с использованием автоматического создания брокером, она использует конфигурацию брокера по умолчанию.

Вы можете либо изменить конфигурацию по умолчанию в брокере, то есть message.max.bytes, но я бы не рекомендовал это, поскольку конфигурация брокера будет применяться ко всем другим автоматически созданным темам.

Или вы можете создать тему явно в своем приложении с желаемой конфигурацией, чтобы брокер не создавал тему автоматически с конфигурацией по умолчанию.

Другие вопросы по теме