Я использую Spark 2.3, и мне нужно сохранить фрейм данных Spark в файл csv, и я ищу лучший способ сделать это ... просматривая связанные / похожие вопросы, я нашел Вот этот, но мне нужен более конкретный:
Если DataFrame слишком большой, как я могу избежать использования Pandas? Потому что я использовал функцию toCSV() (код ниже), и она произвела:
Out Of Memory error (could not allocate memory).
Лучше ли напрямую писать в CSV с использованием файлового ввода-вывода? Сможет ли он сохранить разделители?
Использование df.coalesce(1).write.option("header", "true").csv('mycsv.csv') приведет к тому, что заголовок будет записан в каждом файле, и когда файлы будут объединены, они будут иметь заголовки посередине. Я ошибся?
С точки зрения производительности лучше использовать Spark write, а затем hadoop getmerge, чем использовать coalesce?
def toCSV(spark_df, n=None, save_csv=None, csv_sep=',', csv_quote='"'):
"""get spark_df from hadoop and save to a csv file
Parameters
----------
spark_df: incoming dataframe
n: number of rows to get
save_csv=None: filename for exported csv
Returns
-------
"""
# use the more robust method
# set temp names
tmpfilename = save_csv or (wfu.random_filename() + '.csv')
tmpfoldername = wfu.random_filename()
print n
# write sparkdf to hadoop, get n rows if specified
if n:
spark_df.limit(n).write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)
else:
spark_df.write.csv(tmpfoldername, sep=csv_sep, quote=csv_quote)
# get merge file from hadoop
HDFSUtil.getmerge(tmpfoldername, tmpfilename)
HDFSUtil.rmdir(tmpfoldername)
# read into pandas df, remove tmp csv file
pd_df = pd.read_csv(tmpfilename, names=spark_df.columns, sep=csv_sep, quotechar=csv_quote)
os.remove(tmpfilename)
# re-write the csv file with header!
if save_csv is not None:
pd_df.to_csv(save_csv, sep=csv_sep, quotechar=csv_quote)
Цель функции - сохранить фрейм данных искры в файл csv .. извините, я удалю возврат.






If the DataFrame is too big, how can I avoid using Pandas?
Вы можете просто сохранить файл в HDFS или S3 или в любое другое распределенное хранилище, которое у вас есть.
Is directly writing to a csv using file I/O a better way? Can it preserve the separators?
Если вы имеете в виду сохранение файла в локальное хранилище - это все равно вызовет исключение OOM, поскольку для этого вам нужно будет переместить все данные в памяти на локальном компьютере.
Using df.coalesce(1).write.option("header", "true").csv('mycsv.csv') will cause the header to be written in each file and when the files are merged, it will have headers in the middle. Am I wrong?
В этом случае у вас будет только 1 файл (поскольку у вас coalesce(1)). Так что вам не нужно заботиться о заголовках. Вместо этого - вам следует позаботиться о памяти исполнителей - вы можете получить OOM для исполнителя, поскольку все данные будут перемещены в этот исполнитель.
Using spark write and then hadoop getmerge is better than using coalesce from the point of performance?
Определенно лучше (но не используйте coalesce()). Spark будет эффективно записывать данные в хранилище, затем HDFS будет дублировать данные, и после этого getmerge сможет эффективно читать данные с узлов и объединять их.
На самом деле, я сохраняю в HDFS (это то, что делает getmerge, не так ли?), Но мне нужен 1 файл csv локально, чтобы использовать его в дальнейших операциях ... чтобы мне не приходилось повторять его чтение из HDFS, что дорого ..
hadoop fs -getmerge [-nl] <src> <localdst> - берет каталог в HDFS и объединяет все файлы в один в локальной файловой системе в соответствии с документами здесь.
Мы использовали библиотеку databricks. Отлично работает
df.save("com.databricks.spark.csv", SaveMode.Overwrite, Map("delimiter" -> delim, "nullValue" -> "-", "path" -> tempFPath))
Библиотека:
<!-- spark df to csv -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.3.0</version>
</dependency>
работает даже на версиях spark 2.x? Думал только на спарк 1.х
для Spark 2 добавьте spark-cav в путь к классам, затем перейдите в df.write.format ("csv"). save (path)
Было бы лучше, если бы вы уменьшили функцию до представления основных функций, которые вы хотите реализовать. Спрашивать как я могу избежать использования панд, когда функция возвращает
pandas.core.frame.DataFrame, не имеет смысла.