PySpark: как добавить кадры данных в цикл for

Я выполняю вычисление скользящей медианы для отдельных кадров данных временных рядов, затем я хочу объединить/добавить результаты.

# UDF for rolling median
median_udf = udf(lambda x: float(np.median(x)), FloatType())

series_list = ['0620', '5914']
SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
        collect_list("metric").over(w)) \
        .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

SeriesAppend

[DataFrame[ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка, syscode: строка, ntwrk_cd: строка, syscode_ntwrk: строка, метрика: двойная, список: массив, roll_median: float], DataFrame[ntwrk_genre_cd: строка, дата: дата, mkt_cd: строка, syscode: строка, ntwrk_cd: строка, syscode_ntwrk: строка, метрика: double, список: массив, roll_median: float]]

Когда я пытаюсь .show():

'list' object has no attribute 'show'
Traceback (most recent call last):
AttributeError: 'list' object has no attribute 'show'

Я понимаю, что это говорит о том, что объект является список фреймов данных. Как преобразовать в один фрейм данных?

Я знаю, что следующее решение работает для количества кадров данных явный, но я хочу, чтобы мой цикл for не зависел от количества кадров данных:

from functools import reduce
from pyspark.sql import DataFrame

dfs = [df1,df2,df3]
df = reduce(DataFrame.unionAll, dfs)

Есть ли способ обобщить это на неявные имена фреймов данных?

Я думаю, вам нужно union. Взгляните на этот отвечать, явно описан метод объединения нескольких кадров данных из списка.

Ben.T 29.05.2019 18:19

объединить их всех вместе. Один из способов — использовать functools.reduce и сделать следующее: reduce(lambda a, b: a.union(b), SeriesAppend[1:], SeriesAppend[0])

pault 29.05.2019 18:20

Возможный дубликат Spark unionВсе несколько фреймов данных. Второй ответ для pyspark.

pault 29.05.2019 18:29

Если вы добавите "ID" в свое окно w в качестве другого аргумента partitionBy, вам вообще не нужно выполнять цикл for и объединение. Просто подставьте фрейм данных в нужные идентификаторы test_df = test_df.where(col("ID").isin(series_list)), и все готово.

Richard Nemeth 29.05.2019 20:13

Ричард, это предложение сработает, но я не буду знать все свои удостоверения личности. Например, будет где-то около 30 тысяч серий, но точное N не определено.

mwhee 29.05.2019 20:58

@mwhee, что вы подразумеваете под явным количеством кадров данных? смысл использования reduce состоит в том, чтобы выполнять функцию (здесь объединение) столько раз, сколько вам нужно. Если вы делаете df = reduce(DataFrame.unionAll, SeriesAppend) вне цикла for, вам не нужно нигде указывать номер кадра данных. Или есть что-то еще, что я пропустил/не понял?

Ben.T 29.05.2019 21:51
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
7
6
16 008
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Всем спасибо! Подводя итог - решение использует Reduce и unionAll:

from functools import reduce
from pyspark.sql import DataFrame

SeriesAppend=[]

for item in series_list:
    # Filter for select item
    series = test_df.where(col("ID").isin([item]))
    # Sort time series
    series_sorted = series.sort(series.ID, 
    series.date).persist()
    # Calculate rolling median
    series_sorted = series_sorted.withColumn("list", 
         collect_list("metric").over(w)) \
         .withColumn("rolling_median", median_udf("list"))

    SeriesAppend.append(series_sorted)

df_series = reduce(DataFrame.unionAll, SeriesAppend)

Вы должны добавить в свой ответ строки from functools import reducefrom pyspark.sql import DataFrame Чтобы людям не приходилось искать дальше.

Laurent 02.12.2021 14:09

@Laurent - Спасибо, я добавил в решение библиотеки импорта.

mwhee 03.12.2021 15:05

Спасибо. Ваш ответ был очень полезен для меня.

Laurent 03.12.2021 16:35

Другие вопросы по теме