Поддержка коннектора kafka-connect-hdfs для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer

Я искал поддержку коннектора kafka-connect-hdfs (Confluent) для сохранения массива байтов и разделения полей с использованием схемы FlatBuffer.

Я получаю данные в массиве байтов от kafka. Этот массив байтов генерируется из FlatBuffer. Нужно сохранить его в HDFS по пути, скажем, Field1/Field2/Field3. Все эти поля должны быть извлечены из массива байтов с использованием схемы FlatBuffer. Кроме того, данные, которые должны быть сохранены в HDFS, должны быть только в байтах. Преобразование данных не требуется.

Я проверил оба:

  1. Разделитель поля: https://github.com/confluentinc/kafka-connect-storage-common/blob/master/partitioner/src/main/java/io/confluent/connect/storage/partitioner/FieldPartitioner.java
  2. Поддерживаемые форматы: Json, Avro, Parquet. В https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/format/json/JsonRecordWriterProvider.java, хотя я нахожу байтовый массив, сохраненный в HDFS, если данные имеют тип Kafka Struct.

Я не мог найти способ использовать их для моей цели.

Кто-нибудь знает о такой встроенной поддержке. Если нет, то, пожалуйста, направьте меня к ресурсу (если есть), чтобы создать индивидуальную поддержку для обоих.

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
227
1

Ответы 1

FlatBuffers не является (в настоящее время) поддерживаемым форматом сериализации, а ByteArrayFormat доступен только для S3 Connect, а не для HDFS, и просто выгружает формат ByteArraySerializer из Kafka (который будет объектом Struct после конвертера, да.

Что касается секционирования, поскольку данные представляют собой только байты, они не проверяют значения записи для поддержки разделителей, поэтому вам также потребуется добавить пользовательское из них, что потребует десериализации сообщения для проверки полей.

Я не уверен, почему вы связались с кодом подключения S3, но если вы хотите добавить свой собственный формат, посмотрите на подключение PR, который добавил StringFormat к HDFS.


Чтобы построить проект, посмотри на часто задаваемые вопросы

Другие вопросы по теме