Я пытаюсь использовать kafka-connect-hdfs, но, похоже, это не работает.
Пробовал ковыряться в настройках, но ничего не получается..
Это схема сообщения Protobuf:
syntax = "proto3";
package com.company;
option java_package = "com.company";
option java_outer_classname = "MyObjectData";
import public "wrappers.proto";
message MyObject {
int64 site_id = 1;
string time_zone = 2;
uint64 dev_id = 3;
uint64 rep_id = 4;
uint64 dev_sn = 5;
UInt64Value timestamp = 6;
UInt32Value secs = 7;
UInt64Value man_id = 8;
FloatValue panv = 9;
FloatValue outputv = 10;
FloatValue panelc = 11;
FloatValue ereset = 12;
FloatValue temp = 13;
FloatValue tempin = 14;
FloatValue tempout = 15;
UInt32Value sectelem = 16;
FloatValue energytelem = 17;
UInt32Value ecode = 18;
}
Connect-standalone.properties выглядит следующим образом:
bootstrap.servers=k1:9092,k2:9092,k3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject
key.converter.schemas.enable=false
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
И quickstart-hdfs.properties:
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=ObjectTopic
hadoop.conf.dir=/etc/hadoop
hdfs.url=hdfs://hdp-01:8020/user/hdfs/telems
hadoop.home=/etc/hadoop/client
flush.size=3
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.blueapron.connect.protobuf.ProtobufConverter
value.converter.protoClassName=com.company.MyObjectData$MyObject
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
transforms=SetSchemaName
transforms.SetSchemaName.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.SetSchemaName.schema.name=com.acme.avro.MyObject
В настоящее время я получаю следующую ошибку:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.avro.SchemaParseException: Can't redefine: io.confluent.connect.avro.ConnectDefault at org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:133) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:270) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:222) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:188) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:131) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:106) at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:75) at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:643) at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379) at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:375) at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
Кроме того, если это имеет значение, я делаю это с помощью пользователя hdfs
Проблема в схеме? Кажется, что ничего из того, что я делаю, даже не меняет сообщение об ошибке...





Возможно, Can't redefine: io.confluent.connect.avro.ConnectDefault связано с тем, что ваше преобразование устанавливает свойство схемы.
Вы также можете попробовать использовать AvroFormat, который возьмет внутренний объект Schema & Struct Connect и запишет в файлы Avro в HDFS.
Обратите внимание, что ParquetFormat использует parquet-avro проект, поэтому для начала данные, вероятно, должны быть Avro.
Обратите внимание на трассировку стека.
org.apache.avro.SchemaParseException ...
...
org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroWriteSupport.init(AvroWriteSupport.java:133) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:270) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:222) at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:188) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:131) at org.apache.parquet.avro.AvroParquetWriter.(AvroParquetWriter.java:106)
Поэтому вам нужно где-то написать конвертер protofuf-avro. Возможно, используя skeuomorph
kafka-connect-hdfs, чтобы можно было обрабатывать ProtobufProtobufConverter, чтобы он генерировал ConnectRecord данных Avro.Если ничего не помогает, вы можете зарегистрировать проблему об этом и посмотреть, что вы получите.
Формат HDFS Parqeut использует классы Avro. Посмотреть исходный код - github.com/confluentinc/kafka-connect-hdfs/blob/master/src/main/…
Я не говорил, что это не так. Я сказал, что это не имеет значения, поскольку Kafka Connect преобразует мой protobuf в avro, а затем в паркет, чтобы сгенерировать формат паркета. Вот почему превращение его в Avro тоже не работает по той же причине.
Это не проблема. Kafka Connect фактически использует конвертер в формат промежуточной схемы, который можно преобразовать в любой формат или из него. Таким образом, любой преобразователь содержит функцию, которая преобразует формат в промежуточный формат, и функцию, которая преобразует промежуточный формат в его формат. Поэтому, когда я использую формат Parquet, он должен автоматически преобразовываться по мере необходимости. Я думаю, что сначала он конвертируется в Avro. Я также проверил ваше решение, и, как я подозревал, оно не сработало. Я не думаю, что оно есть и в ProtobufConverter, потому что connect-elasticsearch работает нормально. Что-то с авро