Spark, NegativeArraySizeException, когда файл последовательности

У меня искра 2.3.

У меня есть этот фрагмент кода, который читает файлы последовательности в разделе «hdfspath» (по этому пути около 20 файлов, и каждый файл составляет около 60 МБ),

SparkSession spark = ...;
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaPairRDD<BytesWritable, BytesWritable> temp = jsc.sequenceFile(hdfspath, BytesWritable.class, BytesWritable.class);
temp.take(1);

И это дает мне эту ошибку,

19/04/03 14:50:18 INFO CodecPool: Got brand-new decompressor [.gz]
19/04/03 14:50:18 INFO CodecPool: Got brand-new decompressor [.gz]
19/04/03 14:50:18 INFO CodecPool: Got brand-new decompressor [.gz]
19/04/03 14:50:18 INFO CodecPool: Got brand-new decompressor [.gz]
19/04/03 14:50:18 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NegativeArraySizeException
    at org.apache.hadoop.io.BytesWritable.setCapacity(BytesWritable.java:144)
    at org.apache.hadoop.io.BytesWritable.setSize(BytesWritable.java:123)
    at org.apache.hadoop.io.BytesWritable.readFields(BytesWritable.java:179)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.io.SequenceFile$Reader.deserializeKey(SequenceFile.java:2606)
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2597)
    at org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:82)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:277)

Файлы hdfs, которые я пытаюсь прочитать, являются результатом старой работы mapreduce с такой настройкой вывода,

job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setOutputFormatClass(SequenceFileAsBinaryOutputFormat.class);
SequenceFileAsBinaryOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK);

Я просмотрел метод org.apache.hadoop.io.BytesWritable.setCapacity(...),

public void setSize(int size) {
if (size > getCapacity()) {
  setCapacity(size * 3 / 2);
}
this.size = size;
}

Каким-то образом параметр размера равен 808464432 и вызывает переполнение при выполнении size*3, что в конечном итоге вызывает исключение NegativeArraySizeException.

Может ли кто-нибудь помочь объяснить, как это происходит, и как это исправить?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
415
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Разобрался. Используйте JavaSparkContext#newAPIHadoopFile вместо JavaSparkContext#sequenceFile.

Другие вопросы по теме