LinkedHashMap заменяется на HashMap и аварийно завершает работу в операторах потока данных flink

Учитывая этот фиктивный код:

 1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

Приложение вылетит в строке 18, за исключением:

Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

Похоже, проблема в том, что при сериализации/десериализации элемент values меняет свой тип объекта, или, другими словами, LinkedHashMap превращается в HashMap.

Обратите внимание, что тот же код, что и в строке 18, отлично работает в строке 12.

При установке точки останова на строку 12 obj.values будет отображаться как LinkedHashMap отладчиком/IntelliJ, однако точка останова в строке 18 будет отображаться obj.values как HashMap в отладчике.

Что здесь происходит? Как я могу это исправить? Ведь LinkedHashMap реализует Serializable?!

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
284
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Сериализатор Kryo Chill по умолчанию для LinkedHashMap не сохраняет тип карты и вместо этого десериализует данные в HashMap. Чтобы этого избежать, нужно создать сериализатор для типа LinkedHashMap:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}

А затем зарегистрируйте его как Kryo Serializer:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())

Спасибо за ответ. С пользовательским сериализатором, похоже, работает. Однако я не уверен, для чего хорош val serializer = new TraversableSerializer ... ? Кажется, он не используется. Кроме того, когда я импортирую org.apache.flink.api.scala.typeutils.TraversableSerializer, он жалуется на параметр false (ожидается параметр типа TypeSerializer).

user826955 11.04.2019 09:21

Жаль, что не должно было быть там. Я исправил второй фрагмент.

Till Rohrmann 11.04.2019 09:23

Хорошо, на самом деле это работает для моего минимального примера, но не работает с моим реальным кодом. В фактической реализации LinkedHashMap скрыт в более сложной структуре класса case, и работа с вашими изменениями теперь выдает ExceptionInChainedOperatorException, вызванный KryoException - он жалуется на «Невозможно найти класс» (мои задействованные классы case). Означает ли это, что теперь мне придется предоставлять собственные сериализаторы для всех моих задействованных классов case и промежуточных классов? Я имею в виду, что это работало до изменений.

user826955 11.04.2019 09:34

Сделал несколько более простых тестов, и кажется, что пользовательский сериализатор не может обрабатывать вложенные типы в LinkedHashMap. Он бросает, например. com.esotericsoftware.kryo.KryoException: Unable to find class: Xscala.None$, и подходит для любого вовлеченного типа, не относящегося к POJO.

user826955 11.04.2019 09:55

Я еще раз обновил реализацию Serializer, чтобы использовать kryo.writeClassAndObject вместо kryo.writeObject. Я надеюсь, что это работает сейчас.

Till Rohrmann 11.04.2019 10:22

Другие вопросы по теме