Каждый день я получаю zip-архив «2018-06-26.zip» размером ок. Сжатый 250 Мб, содержащий 165-170 000 небольших файлов XML (КБ). Я загружаю zip-архив в HDFS (избегая проблемы с небольшими файлами) и использую SPARK для их извлечения из zip (zip-архивы не разделяются), создавая парный RDD с именем файла в качестве ключа и содержимым в качестве значения и сохраняю их как Sequence-файл через парный RDD. Все работает гладко с небольшим zip-архивом, содержащим всего 3 XML-файла для целей тестирования, но когда я загружаю его большой архив, я получаю
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:2367)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
...
...
Я использую Cloudera Quickstart VM: CDH 5.13.3 (HDFS: 2.60, JDK: 1.7.0.67, SPARK: 1.6.0, Scala 2.10)
Я еще не запускал его на полномасштабном кластере, так как хотел убедиться, что мой код верен, прежде чем развертывать его ...
Сборщик мусора продолжает выполнять OOM с превышением предела накладных расходов. Я знаю об увеличении объема памяти для драйвера и Java Heap Space, но подозреваю, что мой подход забирает слишком много памяти .... Мониторинг использования памяти не обнаруживает утечки памяти ...
Вот код:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")
Любая помощь или идеи приветствуются.
Привет @boje, спасибо за комментарий .... я обнаружил, что виноват размер разделов (!) .... так что мое исправление заключалось в том, чтобы оптимизировать размер / количество разделов, изменив это число: sc.parallelize (xml_map.toSeq, 150) .saveAsSequenceFile ("c: / temp / today"), в результате получается 150 разделов каждый примерно по 18 Мбайт, или 50 разделов каждый примерно по 50 Мбайт, или что-то среднее между ними. Это зависит от индивидуальной задачи, размера торговли на скорость и т. д.
Мое окончательное решение:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")
Оригинальный zip-архив 325 Мб, содержащий 170 000 файлов XML. В результате получилось 75 перегородок, каждая ок. 35 Мб. Всего ~ 2.5 Гб Время работы локально на моем ПК с Windows: 1,2 минуты :-)
Вы проверяли, когда возникает ошибка? После 1.000, 10.000 или 100.000 файлов? Когда вы распаковываете файл, сколько места он занимает?