Исключение java.lang.Instantiation при десериализации потока байтов в объект класса case Scala

Я пытаюсь десериализовать поток байтов avro в объект класса scala case. По сути, у меня был поток kafka с потоком данных в кодировке avro, и теперь есть дополнение к схеме, и я пытаюсь обновить класс case scala, чтобы включить новое поле. Класс case выглядит так

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String] = None
               )  {

this () = это («нет», «нет», «нет», 0, нет) }

Схема авро выглядит следующим образом:

{
  "type": "record",
  "name": "some_name",
  "namespace": "some_namespace",
  "fields": [
    {
      "name": "deviceId",
      "type": "string"
    },
    {
      "name": "sw_version",
      "type": "string"
    }, 
    {
      "name": "timestamp",
      "type": "string"
    },
    {
      "name": "reading",
      "type": "double"
    },
    {
      "name": "new_field",
     "type": ["null", "string"],
      "default": null
    }]}

Когда данные получены, я получаю следующее исключение:

java.lang.RuntimeException: java.lang.InstantiationException

Я могу получать данные просто отлично, как потребитель, написанный на python, поэтому я знаю, что данные передаются правильно в правильном формате. Я подозреваю, что проблема связана с созданием конструктора класса case, я пытался сделать это:

/** Case class to hold the Device data. */
case class DeviceData(deviceId: String,
                sw_version: String,
                timestamp: String,
                reading: Double,
                new_field: Option[String]
               )  {
this() = this("na", "na", "na", 0, some("na"))
}

но не повезло.

Код десериализатора (выдержки):

// reader and decoder for reading avro records
private var reader: DatumReader[T] = null
private var decoder : BinaryDecoder = null
decoder = DecoderFactory.get.binaryDecoder(message, decoder)
reader.read(null.asInstanceOf[T], decoder)

Я не смог найти никаких других примеров наличия конструкторов для классов case, которые используются для десериализации avro, в прошлом году я опубликовал соответствующий вопрос java.lang.NoSuchMethodException для метода инициализации в классе case Scala, и на основе ответа я смог реализовать свой текущий код, который с тех пор работает нормально.

Где схема авро? И у вас есть null декодер... null читатель... ну, decoder и reader имеют значение в этом вопросе. Если ваши decoder и reader определены правильно, это не сработает.

sarveshseri 30.05.2019 10:01

Я добавил схему avro, см. выше. Также нулевой считыватель и декодер определены правильно, это весь рабочий код, за исключением того, что добавление «new_field» вызывает исключение java, я попытался добавить println (decoder.getString()), и я могу увидеть первую строку поле (только что сделал это, чтобы увидеть, генерируется ли исключение DecoderFactory или частью чтения, оно генерируется reader.read).

Amit Arora 30.05.2019 15:16
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
2
507
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я решил эту проблему, следуя совершенно другому подходу. Я использовал клиент Confluent Kafka, как показано в этом примере https://github.com/jfrazee/schema-registry-examples/tree/master/src/main/scala/io/atomicfinch/examples/flink. У меня также есть реестр схемы Confluent, который очень легко настроить с помощью контейнерного решения «все в одном», которое поставляется с kafka и реестром схемы https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html.

Мне пришлось добавить слитные зависимости и репозиторий в файл pom.xml. Это идет в разделе репозитория.

<repository>
    <id>confluent</id>
    <url>http://packages.confluent.io/maven/</url>
</repository>

Это идет в разделе зависимостей:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-confluent-registry</artifactId>
    <version>1.8.0</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <!-- For Confluent Platform 5.2.1 -->
    <version>5.2.1</version>
</dependency>

С кодом, представленным в https://github.com/jfrazee/schema-registry-examples/blob/master/src/main/scala/io/atomicfinch/examples/flink/ConfluentRegistryDeserializationSchema.scala, я смог поговорить с реестром схемы Confluent, а затем на основе идентификатора схемы в заголовке сообщения avro это загружает схему из схемы reg и возвращает мне объект GenericRecord, из которого я могу легко все интересующие поля и создать новый DataStream объекта DeviceData.

val kafka_consumer = new FlinkKafkaConsumer010("prod.perfwarden.minute",
  new ConfluentRegistryDeserializationSchema[GenericRecord](classOf[GenericRecord], "http://localhost:8081"),
  properties)
val device_data_stream = env
  .addSource(kafka_consumer)
  .map({x => new DeviceData(x.get("deviceId").toString,
    x.get("sw_version").toString,
    x.get("timestamp").toString,
    x.get("reading").toString.toDouble,
    x.get("new_field").toString)})

Слитный клиент kafka заботится о десериализации потока байтов avro в соответствии со схемой, включая значения по умолчанию. Настройка реестра схем и использование слитного клиента kafka может занять немного времени, чтобы привыкнуть, но, вероятно, это лучшее долгосрочное решение, просто мои 2 цента.

Другие вопросы по теме