Spark - kryo против javaserialization. тот же размер?

Я использую кеш со Spark. Сейчас я использую несколько кешей, и некоторые из них занимают около 20 ГБ памяти. Я попробовал сначала с cache (), а затем с persist и MEMORY_SER, и размер был огромным, поэтому я перешел на сериализацию java, получив около 20 ГБ в некоторых из них. Теперь я хочу использовать Kryo, я зарегистрировал классы, и я не получаю никаких ошибок, но размер такой же, как когда я выполняю его с Kryo в большинстве кешей.

Некоторые из объектов, которые я хочу кэшировать, выглядят так:

case class ObjectToCache(id: Option[Long],
                      listObject1: Iterable[ObjectEnriched],
                       mp1: Map[String, ObjectEnriched2],
                       mp2: Map[String, ObjectEnriched3],
                       mp3: Map[String, ObjectEnriched4])

Я зарегистрировался в Крио как:

kryo.register(classOf[Iterable[ObjectEnriched2]])
kryo.register(classOf[Map[String,ObjectEnriched3]])
kryo.register(classOf[Map[String,ObjectEnriched4]])
kryo.register(ObjectEnriched)
kryo.register(ObjectEnriche2)
kryo.register(ObjectEnriched3)
kryo.register(ObjectEnriched4)

Я делаю что-то неправильно? есть ли способ узнать, использует ли он Kryo? Я думаю, что он используется, потому что в какой-то момент у меня возникла ошибка, потому что у меня не осталось места как:

Serialization trace:
mp1 (ObjectEnriched)
        at com.esotericsoftware.kryo.io.Output.flush(Output.java:183)
        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

Я использую RDD со Spark Streaming.

Покажите код, который вы делаете сериализацию, пожалуйста

John 10.08.2018 16:33

Какой код? Я думаю, вам нужно только включить флаг в Spark, указать ему класс, и когда он будет кэшироваться, он будет использовать Kryo. conf.set ("spark.kryo.registrationRequired", "true") conf.set ("spark.serializer", "org.apache.spark.serializer.Kry‌ oSerializer") conf.set ("spark.kryo.registrator "," com.orange.iris.common.serializer.CustomKryoRegistrator ")

Guille 10.08.2018 16:41
0
2
869
2

Ответы 2

Am I doing something wrong? is there any way to know if it uses Kryo?

Вы действительно используете kryo, и он правильно сериализует ваши объекты.

Если вы установите флаг:

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrator","com.orange.iris.common.serializer.CustomKryoRegistrator")

Тогда он обязательно будет использовать сериализатор Kryo. Кроме того, поскольку вы включаете флаг:

conf.set("spark.kryo.registrationRequired", "true") 

он потерпит неудачу, если попытается сериализовать незарегистрированный класс (см. этот отвечать для получения дополнительной информации).

Сколько у тебя памяти? Если ваши размеры примерно одинаковы при использовании сериализации Java и Kryo, и вы продолжаете использовать MEMORY_ONLY_SER, тогда возможно, что ваши разделы все еще не помещаются в памяти, даже если использование Kryo и Spark пересчитывает части, которые не подходят на лету . Это приведет к тому, что размеры будут одинаковыми.

Есть несколько способов выяснить это: запустить задание и сохранить его в MEMORY_AND_DISK_SER, а затем проверить, не пролился ли диск при использовании Kryo. См. здесь для получения дополнительной информации об уровнях хранения.

Я уже использую RegistrationRequired, и я добавлял все классы ... странно, что размер почти такой же, используя сериализацию java и kryo, поэтому я не уверен, использует ли он Kryo. Мне интересно, не потому ли, что он не может должным образом сериализовать мои классы, потому что они используют Map, Iterable или какой-то сложный объект.

Guille 10.08.2018 21:51

Чтобы проверить, кэширован ли фрейм данных (DF) или нет Просто запустите кеширование, вызвав действие df.show, и проверьте пользовательский интерфейс искры в http: // локальный: 4040 / хранилище, чтобы увидеть, кэшируется ли DF. Вы должны это увидеть.

Вы также можете использовать queryExecution или объяснять, чтобы увидеть InMemoryRelation

scala> df.queryExecution.withCachedData

res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = InMemoryRelation [id # 0L], true, 10000, StorageLevel (диск, память, десериализовано, 1 реплика) + - * Range (0, 1, step = 1, splits = Some (8))

Также попробуйте использовать наборы данных вместо DataFrame. DataSet не использует стандартные методы сериализации. Они используют специализированное столбчатое хранилище с собственными методами сжатия, и вам даже не нужно хранить набор данных с помощью сериализатора Kryo.

Другие вопросы по теме