Я выполняю вычисление скользящей медианы для отдельных кадров данных временных рядов, затем я хочу объединить/добавить результаты.
# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())
series_list = ['0620', '5914']
SeriesAppend=[]
for item in series_list:
# Filter for select item
series = test_df.where(col("ID").isin([item]))
# Sort time series
series_sorted = series.sort(series.ID,
series.date).persist()
# Calculate rolling median
series_sorted = series_sorted.withColumn("list",
collect_list("metric").over(w)) \
.withColumn("rolling_median", median_udf("list"))
SeriesAppend.append(series_sorted)
SeriesAppend
[DataFrame[ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка, syscode: строка, ntwrk_cd: строка, syscode_ntwrk: строка, метрика: двойная, список: массив, roll_median: float], DataFrame[ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка, syscode: строка, ntwrk_cd: строка, syscode_ntwrk: строка, метрика: double, список: массив, roll_median: float]]
Когда я пытаюсь .show():
'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'
Я понимаю, что это говорит о том, что объект является список фреймов данных. Как преобразовать в один фрейм данных?
Я знаю, что следующее решение работает для количества кадров данных явный, но я хочу, чтобы мой цикл for не зависел от количества кадров данных:
from functools import reduce
from pyspark.sql import DataFrame
dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)
Есть ли способ обобщить это на неявные имена фреймов данных?
объединить их всех вместе. Один из способов — использовать functools.reduce
и сделать следующее: reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])
Возможный дубликат Spark unionВсе несколько фреймов данных. Второй ответ для pyspark.
Если вы добавите "ID"
в свое окно w
в качестве другого аргумента partitionBy, вам вообще не нужно выполнять цикл for и объединение. Просто подставьте фрейм данных в нужные идентификаторы test_df = test_df.where(col("ID").isin(series_list))
, и все готово.
Ричард, это предложение сработает, но я не буду знать все свои удостоверения личности. Например, будет где-то около 30 тысяч серий, но точное N не определено.
@mwhee, что вы подразумеваете под явным количеством кадров данных? смысл использования reduce
состоит в том, чтобы выполнять функцию (здесь объединение) столько раз, сколько вам нужно. Если вы делаете df = reduce(DataFrame.unionAll, SeriesAppend)
вне цикла for
, вам не нужно нигде указывать номер кадра данных. Или есть что-то еще, что я пропустил/не понял?
Всем спасибо! Подводя итог - решение использует Reduce и unionAll:
from functools import reduce
from pyspark.sql import DataFrame
SeriesAppend=[]
for item in series_list:
# Filter for select item
series = test_df.where(col("ID").isin([item]))
# Sort time series
series_sorted = series.sort(series.ID,
series.date).persist()
# Calculate rolling median
series_sorted = series_sorted.withColumn("list",
collect_list("metric").over(w)) \
.withColumn("rolling_median", median_udf("list"))
SeriesAppend.append(series_sorted)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
Вы должны добавить в свой ответ строки from functools import reduce
from pyspark.sql import DataFrame
Чтобы людям не приходилось искать дальше.
@Laurent - Спасибо, я добавил в решение библиотеки импорта.
Спасибо. Ваш ответ был очень полезен для меня.
Я думаю, вам нужно
union
. Взгляните на этот отвечать, явно описан метод объединения нескольких кадров данных из списка.