Как настроить Kafka Connect Worker для потоковой передачи большего количества сообщений в HDFS

Моя текущая рабочая установка:

NiFi передает сообщения Avro (ссылка на реестр Confluent Schema) в Kafka (v2.0.0, 20 разделов, Confluent v5.0.0), Kafka Connect Worker (приемник HDFS) передает эти сообщения в формате Parquet в HDFS с помощью flush.size=70000.

Моя проблема:

Эта конфигурация работает нормально, но когда я меняю конфигурацию на flush.size=1000000 (поскольку размер сообщения 70 КБ составляет максимум 5-7 МБ, а размер блока файла Parquet составляет 256 МБ), рабочий процесс подключения возвращает Error sending fetch request ошибки:

...
[2019-05-24 14:00:21,784] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1661483807, epoch=374) to node 3: java.io.IOException: Connection to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-05-24 14:00:21,784] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData = {mytopic-10=(offset=27647797, logStartOffset=24913298, maxBytes=1048576), mytopic-16=(offset=27647472, logStartOffset=24913295, maxBytes=1048576), mytopic-7=(offset=27647429, logStartOffset=24913298, maxBytes=1048576), mytopic-4=(offset=27646967, logStartOffset=24913296, maxBytes=1048576), mytopic-13=(offset=27646404, logStartOffset=24913298, maxBytes=1048576), mytopic-19=(offset=27648276, logStartOffset=24913300, maxBytes=1048576), mytopic-1=(offset=27647036, logStartOffset=24913307, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1661483807, epoch=374)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
...

Мои конфиги:

Конфигурация коннектора HDFS:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=mytopic
hdfs.url=hdfs://hdfsnode:8020/user/someuser/kafka_hdfs_sink/
flush.size=1000000

Конфигурация Kafka Connect Worker:

bootstrap.servers=confleuntnode1:9092,confleuntnode2:9092,confleuntnode3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confleuntnode:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/confluent/current/share/java/

Мой вопрос:

Как передавать сообщения большего размера из Kafka в HDFS с помощью Kafka Connect Worker?

Вместо этого вы можете использовать разделитель на основе времени, например, для получения почасовых разделов, а не просто ограничивать файлы только количеством событий. Таким образом мне удалось получить несколько файлов ГБ в S3.

OneCricketeer 31.05.2019 03:21

@cricket_007, я знаю, но мой вопрос об ошибке. Как изменить мою конфигурацию, чтобы она работала с flush.size=1000000?

deeplay 31.05.2019 05:50

Это должно просто работать ... Однако вы можете посмотреть на увеличение размера кучи. Лично мы в конечном итоге храним небольшие файлы в Avro, а затем периодически перезаписываем и загружаем их в другие таблицы Parquet.

OneCricketeer 01.06.2019 06:52
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
3
584
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я решил эту проблему, запустив соединение в распределенном режиме (вместо автономного). Теперь я могу записать в HDFS до 3,5 миллионов записей (~256 МБ). Но тут есть новые проблемы: 1) очень низкая скорость обработки (35 млн записей за 1 час); 2) невозможность записи паркетных файлов размером более 256 Мб. Я опубликую новые вопросы SO.

Другие вопросы по теме