Соединение двух фреймов данных pyspark и продолжение вычисления суммы и максимума текущего окна

У меня есть два искровых фрейма данных

фрейм данных_1:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
+----------------------------+------+---------+-----+

фрейм данных_2:

+----------------------------+------+
|timestamp                   |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1     |
|2023-08-18T01:20:00.000+0000|1     |
|2023-08-18T01:30:00.000+0000|1     |
|2023-08-18T01:40:00.000+0000|0     |
|2023-08-18T01:50:00.000+0000|1     |
|2023-08-18T02:00:00.000+0000|0     |
+----------------------------+------+

По меткам времени вы можете видеть, что оба фрейма данных представляют собой агрегированные данные за 10 минут, а dataframe_2 — это следующий по порядку объем данных (при условии, что он выполняется в режиме реального времени каждые несколько часов).

Я хотел бы объединить эти два кадра данных, но сохранить расчеты суммы окна и максимального окна.

Я вычисляю два столбца, называемые counter_1 и max_1. Это просто сумма окна и максимум окна.

Окно, которое я использую (но на самом деле между ним может быть любое количество строк):

window = (Window.partitionBy().orderBy("timestamp").rowsBetween(-4, 0))

Ожидаемый результат будет таким:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

Я пробовал несколько способов группировки данных и агрегирования сумм и максимальных значений исходного фрейма данных, но мне не повезло.

РЕДАКТИРОВАНИЕ 29.03.24

Забыл упомянуть, как это должно работать с точки зрения дизайна.

Первый фрейм данных не всегда должен начинаться с 0 для counter_1 и max_1. Могут существовать предыдущие данные, которые устанавливают эти значения до максимального значения окна для счетчика_1 или 1 для max_1.

Примером этого может быть dataframe1:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
+----------------------------+------+---------+-----+

и это dataframe2:

+----------------------------+------+
|timestamp                   |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1     |
|2023-08-18T01:20:00.000+0000|1     |
|2023-08-18T01:30:00.000+0000|1     |
|2023-08-18T01:40:00.000+0000|0     |
|2023-08-18T01:50:00.000+0000|1     |
|2023-08-18T02:00:00.000+0000|0     |
+----------------------------+------+

Если вы используете решение ниже, вы получите следующий результат:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |1        |1    |
|2023-08-18T00:10:00.000+0000|1     |2        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |4        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

Это неверно, поскольку в идеале для первого кадра данных не должны быть пересчитаны значения, потому что они были рассчитаны уже просто для добавления новых данных, продолжающих окна.

Ожидаемый результат этих двух действий будет таким:

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

Предоставьте достаточно кода, чтобы другие могли лучше понять или воспроизвести проблему.

Community 29.03.2024 16:24
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
1
61
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

from pyspark.sql.functions import lit,sum,max
from pyspark.sql.window import Window

window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)

dataframe_2 = dataframe_2.withColumn("counter_1", lit(0)).withColumn("max_1", lit(0))
union_df = dataframe_1.union(dataframe_2)

# Calculate the running sum and max over the window
result_df = union_df \
    .withColumn("counter_1", sum("target").over(window_spec)) \
    .withColumn("max_1", max("target").over(window_spec))

results.show(truncate=False)

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0     |0        |0    |
|2023-08-18T00:10:00.000+0000|1     |1        |1    |
|2023-08-18T00:20:00.000+0000|1     |2        |1    |
|2023-08-18T00:30:00.000+0000|0     |2        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
+----------------------------+------+---------+-----+

проблема с этим решением заключается в том, что предполагается, что первая временная метка первого кадра данных всегда будет counter_1 и max_1, равным 0 или 1, в зависимости от того, что имеет целевой столбец, но существующие данные для counter_1 в первой строке могут быть неактивны. до максимального окна (в данном случае 4). Если я получаю новые данные каждые 4 часа, допустим, что первая строка, скорее всего, не будет равна 0 для счетчика_1.

Aaron Brazier 29.03.2024 14:03
Ответ принят как подходящий

Для всех, кто обнаружит это и столкнется с аналогичной проблемой, это, по-видимому, решение, к которому я собираюсь прибегнуть после долгих поисков с CoPilot и внесения изменений в соответствии с моим вариантом использования.

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, lit
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Continuous DataFrame Join and Window Calculations") \
    .getOrCreate()

# Define the dataframes
df1 = spark.createDataFrame([
    ("2023-08-18T00:00:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:10:00.000+0000", 1, 4, 1),
    ("2023-08-18T00:20:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:30:00.000+0000", 0, 3, 1),
    ("2023-08-18T00:40:00.000+0000", 1, 3, 1),
    ("2023-08-18T00:50:00.000+0000", 0, 3, 1),
    ("2023-08-18T01:00:00.000+0000", 1, 3, 1)
], ["timestamp", "target", "counter_1", "max_1"])

df2 = spark.createDataFrame([
    ("2023-08-18T01:10:00.000+0000", 1),
    ("2023-08-18T01:20:00.000+0000", 1),
    ("2023-08-18T01:30:00.000+0000", 1),
    ("2023-08-18T01:40:00.000+0000", 0),
    ("2023-08-18T01:50:00.000+0000", 1),
    ("2023-08-18T02:00:00.000+0000", 0)
], ["timestamp", "target"])

# Define the window specification
window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)

# Add 'counter_1' and 'max_1' columns to df2
df2 = df2.withColumn("counter_1", lit(None))
df2 = df2.withColumn("max_1", lit(None))

# Combine the last 4 rows of df1 and all rows of df2
combined_df = df1.orderBy("timestamp", ascending=False).limit(4).union(df2).sort('timestamp')

# Calculate 'counter_1' and 'max_1' for the combined dataframe
combined_df = combined_df.withColumn("counter_1", sum("target").over(window_spec))
combined_df = combined_df.withColumn("max_1",max("target").over(window_spec))

# Replace the 'counter_1' and 'max_1' values in df2 with the calculated values
df2 = df2.drop("counter_1", "max_1")
df2 = df2.join(combined_df, ["timestamp", "target"], how = "left")
# Concatenate df1 and df2
result = df1.union(df2)
result.show(truncate=False)

+----------------------------+------+---------+-----+
|timestamp                   |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1     |3        |1    |
|2023-08-18T00:10:00.000+0000|1     |4        |1    |
|2023-08-18T00:20:00.000+0000|1     |3        |1    |
|2023-08-18T00:30:00.000+0000|0     |3        |1    |
|2023-08-18T00:40:00.000+0000|1     |3        |1    |
|2023-08-18T00:50:00.000+0000|0     |3        |1    |
|2023-08-18T01:00:00.000+0000|1     |3        |1    |
|2023-08-18T01:10:00.000+0000|1     |3        |1    |
|2023-08-18T01:30:00.000+0000|1     |4        |1    |
|2023-08-18T01:20:00.000+0000|1     |4        |1    |
|2023-08-18T01:40:00.000+0000|0     |4        |1    |
|2023-08-18T02:00:00.000+0000|0     |3        |1    |
|2023-08-18T01:50:00.000+0000|1     |4        |1    |
+----------------------------+------+---------+-----+

Другие вопросы по теме