Учитывая этот фиктивный код:
1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2 implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3 implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5 val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7 env
8 .fromElements("one")
9 .map(str =>
10 {
11 val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12 val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14 obj
15 })
16 .map(obj =>
17 {
18 val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20 obj
21 })
Приложение вылетит в строке 18, за исключением:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
Похоже, проблема в том, что при сериализации/десериализации элемент values
меняет свой тип объекта, или, другими словами, LinkedHashMap
превращается в HashMap
.
Обратите внимание, что тот же код, что и в строке 18, отлично работает в строке 12.
При установке точки останова на строку 12 obj.values
будет отображаться как LinkedHashMap
отладчиком/IntelliJ, однако точка останова в строке 18 будет отображаться obj.values
как HashMap
в отладчике.
Что здесь происходит? Как я могу это исправить? Ведь LinkedHashMap
реализует Serializable
?!
Сериализатор Kryo Chill по умолчанию для LinkedHashMap
не сохраняет тип карты и вместо этого десериализует данные в HashMap
. Чтобы этого избежать, нужно создать сериализатор для типа LinkedHashMap
:
class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}
А затем зарегистрируйте его как Kryo Serializer
:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
Жаль, что не должно было быть там. Я исправил второй фрагмент.
Хорошо, на самом деле это работает для моего минимального примера, но не работает с моим реальным кодом. В фактической реализации LinkedHashMap
скрыт в более сложной структуре класса case, и работа с вашими изменениями теперь выдает ExceptionInChainedOperatorException
, вызванный KryoException
- он жалуется на «Невозможно найти класс» (мои задействованные классы case). Означает ли это, что теперь мне придется предоставлять собственные сериализаторы для всех моих задействованных классов case и промежуточных классов? Я имею в виду, что это работало до изменений.
Сделал несколько более простых тестов, и кажется, что пользовательский сериализатор не может обрабатывать вложенные типы в LinkedHashMap
. Он бросает, например. com.esotericsoftware.kryo.KryoException: Unable to find class: Xscala.None$
, и подходит для любого вовлеченного типа, не относящегося к POJO.
Я еще раз обновил реализацию Serializer
, чтобы использовать kryo.writeClassAndObject
вместо kryo.writeObject
. Я надеюсь, что это работает сейчас.
Спасибо за ответ. С пользовательским сериализатором, похоже, работает. Однако я не уверен, для чего хорош
val serializer = new TraversableSerializer ...
? Кажется, он не используется. Кроме того, когда я импортируюorg.apache.flink.api.scala.typeutils.TraversableSerializer
, он жалуется на параметрfalse
(ожидается параметр типаTypeSerializer
).