У меня есть фрейм данных df1, как показано ниже, со схемой:
scala> df1.printSchema
root
|-- filecontent: binary (nullable = true)
|-- filename: string (nullable = true)
У DF есть имя файла и его содержимое. Содержимое хранится в формате GZIP. Я мог бы использовать что-то вроде приведенного ниже, чтобы распаковать данные в файле и сохранить их в HDFS.
def decompressor(origRow: Row) = {
val filename = origRow.getString(1)
val filecontent = serialise(origRow.getString(0))
val unzippedData = new GZIPInputStream(new ByteArrayInputStream(filecontent))
val hadoop_fs = FileSystem.get(sc.hadoopConfiguration)
val filenamePath = new Path(filename)
val fos = hadoop_fs.create(filenamePath)
org.apache.hadoop.io.IOUtils.copyBytes(unzippedData, fos, sc.hadoopConfiguration)
fos.close()
}
Моя цель:
Поскольку данные столбца filecontent в df1 являются двоичными, то есть Array [byte], я не должен распространять данные и хранить их вместе и передавать их функции, чтобы она могла распаковать и сохранить их в файл.
Мой вопрос:
Распределение данных и параллельная обработка - основная тема Spark и Hadoop. Вы по-прежнему можете обрабатывать строку за строкой распределенным образом.
df1 является результатом очень длинного искрового кода MPP, и выполнение этой построчной операции является одним из многих шагов. Мне нужно ограничить это построчной операцией только для этого сценария.





Если вы не хотите распространять или распараллеливать, не используйте Spark? Единственное, что вы действительно можете сделать, это собрать или преобразовать
toLocalIterator.