Я пытаюсь добавить произвольное количество кадров данных PySpark вместе.
Это делается с помощью функции union_all
ниже:
from functools import reduce
from pyspark.sql import DataFrame
def union_all(*dfs):
return reduce(DataFrame.union, dfs)
При оценке он возвращает TypeError
:
TypeError: reduce() of empty sequence with no initial value
Следующий поток охватывает тот же TypeError
, но для другого случая (лямбда-функция в диапазоне целых чисел):
как исправить функцию reduce() пустой последовательности без ошибки начального значения?
Из этого обсуждения решение заключалось в том, чтобы предоставить инициализатор функции reduce
.
В моем случае это может быть фрейм данных PySpark, поэтому я использую первый элемент в своем списке фреймов данных:
def union_all(*dfs):
return reduce(DataFrame.union, dfs, dfs[0])
Вызов вышеуказанной функции вызывает IndexError
:
IndexError: tuple index out of range
Что это значит и как это можно решить?
Данные, которые я использую (dfs
):
Каждый элемент в dfs
представляет собой фрейм данных с одинаковыми столбцами в том же порядке (всего 3 фрейма данных). Это один из фреймов данных в качестве примера:
DataFrame[id: bigint, index: int, sn: string, sid: string, dt: string, ps: string, fr: string, hr:
string, pn: string, aid: string, mf: string, mn: string]
поместите print(dfs)
в начало вашей функции - это покажет вам, что содержит dfs. скорее всего ничего не содержит.
Я подозреваю, что это должно быть: reduce(lambda x, y: x.union(y), dfs)
, поскольку union
вызывается для объекта df, а второй передается в качестве аргумента. В том, как вы это реализовали, я думаю, что reduce
может вместо этого попытаться сделать что-то вроде union(x, y)
.
@ go2nirvana Вызов метода через класс — это нормально, но сначала вам нужно явно передать параметр self
. Это означает, что DataFrame.union(x, y)
совпадает с x.union(y)
(при условии, что x
имеет тип DataFrame
)
@ЕвгенКузьмович, ты прав. Никогда не использовали методы экземпляра вне экземпляра. Это новое, спасибо.
Кажется, вы передаете пустую последовательность в функцию union_all в качестве аргумента dfs
. Исходное значение включается в последовательность в качестве первого элемента для операции сокращения.
https://docs.python.org/3/library/functools.html#functools.reduce
Если необязательный инициализатор присутствует, он помещается перед элементами итерации в расчете и служит по умолчанию, когда итерируемый пуст.
Поэтому, когда вы предоставляете пустую последовательность, если вы не хотите получать какие-либо ошибки, вы должны указать инициализатор, поэтому в этом случае он соответствует пустому фрейму данных. Однако это бессмысленно, потому что у вас нет информации о схеме для создания соответствующего фрейма данных. Поэтому, вероятно, лучше проверить, что данный параметр имеет значение, и проверить, что данные кадры данных имеют одинаковую схему для операции объединения. Пример;
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame
sparkSession = SparkSession.builder.getOrCreate()
# example schema
struct = StructType()
struct.add("a", "integer")
struct.add("b", "integer")
struct.add("c", "integer")
# example dataframes
df_1 = sparkSession.createDataFrame(data=[(1, 2, 3), ], schema=struct)
df_2 = sparkSession.createDataFrame(data=[(10, 20, 30), ], schema=struct)
df_3 = sparkSession.createDataFrame(data=[(100, 200, 300), ], schema=struct)
def union_dfs(*dfs):
if not dfs: # check dfs tuple is non-empty
raise ValueError("At least one dataframe must be provided")
schemas = [df.schema for df in dfs]
if len(set(schemas)) != 1: # validate all dfs have the same schema
raise ValueError("Each of df's schema must be the same")
result_df = reduce(DataFrame.union, dfs)
return result_df
union_df = union_dfs(df_1, df_2, df_3)
union_df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 2| 3|
| 10| 20| 30|
|100|200|300|
+---+---+---+
Спасибо, @sagmansercan. Оказалось, что dfs
были пусты. Ваша обработка ошибок помогла мне это увидеть.
Это означает, что
dfs
пусто. Укажите строку, в которой вы вызываете функциюunion_all
.