Kafka-Connect HDFS — Protobuf для паркета

Я пытаюсь использовать kafka-connect-hdfs, но, похоже, это не работает.

Пробовал ковыряться в настройках, но ничего не получается..

Это схема сообщения Protobuf:

syntax = "proto3";
package com.company;
option java_package = "com.company";
option java_outer_classname = "MyObjectData";
import public "wrappers.proto";
message MyObject {
int64 site_id = 1;
string time_zone = 2;
uint64 dev_id = 3;
uint64 rep_id = 4;
uint64 dev_sn = 5;
UInt64Value timestamp = 6;
UInt32Value secs = 7;
UInt64Value man_id = 8;
FloatValue panv = 9;
FloatValue outputv = 10;
FloatValue panelc = 11;
FloatValue ereset = 12;
FloatValue temp = 13;
FloatValue tempin = 14;
FloatValue tempout = 15;
UInt32Value sectelem = 16;
FloatValue energytelem = 17;
UInt32Value ecode = 18;

}

Connect-standalone.properties выглядит следующим образом:

bootstrap.servers=k1:9092,k2:9092,k3:9092


key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject
key.converter.schemas.enable=false
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java

И quickstart-hdfs.properties:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=ObjectTopic
hadoop.conf.dir=/etc/hadoop
hdfs.url=hdfs://hdp-01:8020/user/hdfs/telems
hadoop.home=/etc/hadoop/client
flush.size=3
key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

transforms=SetSchemaName
transforms.SetSchemaName.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetSchemaName.schema.name=com.acme.avro.MyObject

В настоящее время я получаю следующую ошибку:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault at org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:133) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:270) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:222) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:188) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:131) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:106) at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:75) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)

Кроме того, если это имеет значение, я делаю это с помощью пользователя hdfs

Проблема в схеме? Кажется, что ничего из того, что я делаю, даже не меняет сообщение об ошибке...

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
976
1

Ответы 1

Возможно, Can't redefine: io.confluent.connect.avro.ConnectDefault связано с тем, что ваше преобразование устанавливает свойство схемы.

Вы также можете попробовать использовать AvroFormat, который возьмет внутренний объект Schema & Struct Connect и запишет в файлы Avro в HDFS.

Обратите внимание, что ParquetFormat использует parquet-avro проект, поэтому для начала данные, вероятно, должны быть Avro.

Обратите внимание на трассировку стека.

org.apache.avro.SchemaParseException ...

...

org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:133) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:270) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:222) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:188) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:131) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:106)

Поэтому вам нужно где-то написать конвертер protofuf-avro. Возможно, используя skeuomorph

  1. Kafka Streams или аналогичный процесс между вашим производителем и Connect (самый простой из этих вариантов)
  2. Измените проект kafka-connect-hdfs, чтобы можно было обрабатывать Protobuf
  3. Измените код ProtobufConverter, чтобы он генерировал ConnectRecord данных Avro.

Если ничего не помогает, вы можете зарегистрировать проблему об этом и посмотреть, что вы получите.

Это не проблема. Kafka Connect фактически использует конвертер в формат промежуточной схемы, который можно преобразовать в любой формат или из него. Таким образом, любой преобразователь содержит функцию, которая преобразует формат в промежуточный формат, и функцию, которая преобразует промежуточный формат в его формат. Поэтому, когда я использую формат Parquet, он должен автоматически преобразовываться по мере необходимости. Я думаю, что сначала он конвертируется в Avro. Я также проверил ваше решение, и, как я подозревал, оно не сработало. Я не думаю, что оно есть и в ProtobufConverter, потому что connect-elasticsearch работает нормально. Что-то с авро

Ben Yaakobi 18.02.2019 09:15

Формат HDFS Parqeut использует классы Avro. Посмотреть исходный код - github.com/confluentinc/kafka-connect-hdfs/blob/master/src/m‌​ain/…

OneCricketeer 20.02.2019 23:55

Я не говорил, что это не так. Я сказал, что это не имеет значения, поскольку Kafka Connect преобразует мой protobuf в avro, а затем в паркет, чтобы сгенерировать формат паркета. Вот почему превращение его в Avro тоже не работает по той же причине.

Ben Yaakobi 21.02.2019 06:41

Другие вопросы по теме