Я пытаюсь десериализовать поток байтов avro в объект класса scala case. По сути, у меня был поток kafka с потоком данных в кодировке avro, и теперь есть дополнение к схеме, и я пытаюсь обновить класс case scala, чтобы включить новое поле. Класс case выглядит так
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String] = None
) {
this () = это («нет», «нет», «нет», 0, нет) }
Схема авро выглядит следующим образом:
{
"type": "record",
"name": "some_name",
"namespace": "some_namespace",
"fields": [
{
"name": "deviceId",
"type": "string"
},
{
"name": "sw_version",
"type": "string"
},
{
"name": "timestamp",
"type": "string"
},
{
"name": "reading",
"type": "double"
},
{
"name": "new_field",
"type": ["null", "string"],
"default": null
}]}
Когда данные получены, я получаю следующее исключение:
java.lang.RuntimeException: java.lang.InstantiationException
Я могу получать данные просто отлично, как потребитель, написанный на python, поэтому я знаю, что данные передаются правильно в правильном формате. Я подозреваю, что проблема связана с созданием конструктора класса case, я пытался сделать это:
/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
sw_version: String,
timestamp: String,
reading: Double,
new_field: Option[String]
) {
this() = this("na", "na", "na", 0, some("na"))
}
но не повезло.
Код десериализатора (выдержки):
// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)
Я не смог найти никаких других примеров наличия конструкторов для классов case, которые используются для десериализации avro, в прошлом году я опубликовал соответствующий вопрос java.lang.NoSuchMethodException для метода инициализации в классе case Scala, и на основе ответа я смог реализовать свой текущий код, который с тех пор работает нормально.
Я добавил схему avro, см. выше. Также нулевой считыватель и декодер определены правильно, это весь рабочий код, за исключением того, что добавление «new_field» вызывает исключение java, я попытался добавить println (decoder.getString()), и я могу увидеть первую строку поле (только что сделал это, чтобы увидеть, генерируется ли исключение DecoderFactory или частью чтения, оно генерируется reader.read).
Я решил эту проблему, следуя совершенно другому подходу. Я использовал клиент Confluent Kafka, как показано в этом примере https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. У меня также есть реестр схемы Confluent, который очень легко настроить с помощью контейнерного решения «все в одном», которое поставляется с kafka и реестром схемы https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.
Мне пришлось добавить слитные зависимости и репозиторий в файл pom.xml. Это идет в разделе репозитория.
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
Это идет в разделе зависимостей:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<!-- For Confluent Platform 5.2.1 -->
<version>5.2.1</version>
</dependency>
С кодом, представленным в https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala, я смог поговорить с реестром схемы Confluent, а затем на основе идентификатора схемы в заголовке сообщения avro это загружает схему из схемы reg и возвращает мне объект GenericRecord, из которого я могу легко все интересующие поля и создать новый DataStream объекта DeviceData.
val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
properties)
val device_data_stream = env
.addSource(kafka_consumer)
.map({x => new DeviceData(x.get("deviceId").toString,
x.get("sw_version").toString,
x.get("timestamp").toString,
x.get("reading").toString.toDouble,
x.get("new_field").toString)})
Слитный клиент kafka заботится о десериализации потока байтов avro в соответствии со схемой, включая значения по умолчанию. Настройка реестра схем и использование слитного клиента kafka может занять немного времени, чтобы привыкнуть, но, вероятно, это лучшее долгосрочное решение, просто мои 2 цента.
Где схема авро? И у вас есть
null
декодер...null
читатель... ну,decoder
иreader
имеют значение в этом вопросе. Если вашиdecoder
иreader
определены правильно, это не сработает.