У меня есть два искровых фрейма данных
фрейм данных_1:
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0 |0 |0 |
|2023-08-18T00:10:00.000+0000|1 |1 |1 |
|2023-08-18T00:20:00.000+0000|1 |2 |1 |
|2023-08-18T00:30:00.000+0000|0 |2 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
+----------------------------+------+---------+-----+
фрейм данных_2:
+----------------------------+------+
|timestamp |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1 |
|2023-08-18T01:20:00.000+0000|1 |
|2023-08-18T01:30:00.000+0000|1 |
|2023-08-18T01:40:00.000+0000|0 |
|2023-08-18T01:50:00.000+0000|1 |
|2023-08-18T02:00:00.000+0000|0 |
+----------------------------+------+
По меткам времени вы можете видеть, что оба фрейма данных представляют собой агрегированные данные за 10 минут, а dataframe_2 — это следующий по порядку объем данных (при условии, что он выполняется в режиме реального времени каждые несколько часов).
Я хотел бы объединить эти два кадра данных, но сохранить расчеты суммы окна и максимального окна.
Я вычисляю два столбца, называемые counter_1 и max_1. Это просто сумма окна и максимум окна.
Окно, которое я использую (но на самом деле между ним может быть любое количество строк):
window = (Window.partitionBy().orderBy("timestamp").rowsBetween(-4, 0))
Ожидаемый результат будет таким:
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0 |0 |0 |
|2023-08-18T00:10:00.000+0000|1 |1 |1 |
|2023-08-18T00:20:00.000+0000|1 |2 |1 |
|2023-08-18T00:30:00.000+0000|0 |2 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
+----------------------------+------+---------+-----+
Я пробовал несколько способов группировки данных и агрегирования сумм и максимальных значений исходного фрейма данных, но мне не повезло.
Забыл упомянуть, как это должно работать с точки зрения дизайна.
Первый фрейм данных не всегда должен начинаться с 0 для counter_1 и max_1. Могут существовать предыдущие данные, которые устанавливают эти значения до максимального значения окна для счетчика_1 или 1 для max_1.
Примером этого может быть dataframe1:
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1 |3 |1 |
|2023-08-18T00:10:00.000+0000|1 |4 |1 |
|2023-08-18T00:20:00.000+0000|1 |3 |1 |
|2023-08-18T00:30:00.000+0000|0 |3 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
+----------------------------+------+---------+-----+
и это dataframe2:
+----------------------------+------+
|timestamp |target|
+----------------------------+------+
|2023-08-18T01:10:00.000+0000|1 |
|2023-08-18T01:20:00.000+0000|1 |
|2023-08-18T01:30:00.000+0000|1 |
|2023-08-18T01:40:00.000+0000|0 |
|2023-08-18T01:50:00.000+0000|1 |
|2023-08-18T02:00:00.000+0000|0 |
+----------------------------+------+
Если вы используете решение ниже, вы получите следующий результат:
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1 |1 |1 |
|2023-08-18T00:10:00.000+0000|1 |2 |1 |
|2023-08-18T00:20:00.000+0000|1 |3 |1 |
|2023-08-18T00:30:00.000+0000|0 |3 |1 |
|2023-08-18T00:40:00.000+0000|1 |4 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
+----------------------------+------+---------+-----+
Это неверно, поскольку в идеале для первого кадра данных не должны быть пересчитаны значения, потому что они были рассчитаны уже просто для добавления новых данных, продолжающих окна.
Ожидаемый результат этих двух действий будет таким:
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1 |3 |1 |
|2023-08-18T00:10:00.000+0000|1 |4 |1 |
|2023-08-18T00:20:00.000+0000|1 |3 |1 |
|2023-08-18T00:30:00.000+0000|0 |3 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
+----------------------------+------+---------+-----+
from pyspark.sql.functions import lit,sum,max
from pyspark.sql.window import Window
window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
dataframe_2 = dataframe_2.withColumn("counter_1", lit(0)).withColumn("max_1", lit(0))
union_df = dataframe_1.union(dataframe_2)
# Calculate the running sum and max over the window
result_df = union_df \
.withColumn("counter_1", sum("target").over(window_spec)) \
.withColumn("max_1", max("target").over(window_spec))
results.show(truncate=False)
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|0 |0 |0 |
|2023-08-18T00:10:00.000+0000|1 |1 |1 |
|2023-08-18T00:20:00.000+0000|1 |2 |1 |
|2023-08-18T00:30:00.000+0000|0 |2 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
+----------------------------+------+---------+-----+
проблема с этим решением заключается в том, что предполагается, что первая временная метка первого кадра данных всегда будет counter_1 и max_1, равным 0 или 1, в зависимости от того, что имеет целевой столбец, но существующие данные для counter_1 в первой строке могут быть неактивны. до максимального окна (в данном случае 4). Если я получаю новые данные каждые 4 часа, допустим, что первая строка, скорее всего, не будет равна 0 для счетчика_1.
Для всех, кто обнаружит это и столкнется с аналогичной проблемой, это, по-видимому, решение, к которому я собираюсь прибегнуть после долгих поисков с CoPilot и внесения изменений в соответствии с моим вариантом использования.
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, max, lit
from pyspark.sql.window import Window
# Initialize SparkSession
spark = SparkSession.builder \
.appName("Continuous DataFrame Join and Window Calculations") \
.getOrCreate()
# Define the dataframes
df1 = spark.createDataFrame([
("2023-08-18T00:00:00.000+0000", 1, 3, 1),
("2023-08-18T00:10:00.000+0000", 1, 4, 1),
("2023-08-18T00:20:00.000+0000", 1, 3, 1),
("2023-08-18T00:30:00.000+0000", 0, 3, 1),
("2023-08-18T00:40:00.000+0000", 1, 3, 1),
("2023-08-18T00:50:00.000+0000", 0, 3, 1),
("2023-08-18T01:00:00.000+0000", 1, 3, 1)
], ["timestamp", "target", "counter_1", "max_1"])
df2 = spark.createDataFrame([
("2023-08-18T01:10:00.000+0000", 1),
("2023-08-18T01:20:00.000+0000", 1),
("2023-08-18T01:30:00.000+0000", 1),
("2023-08-18T01:40:00.000+0000", 0),
("2023-08-18T01:50:00.000+0000", 1),
("2023-08-18T02:00:00.000+0000", 0)
], ["timestamp", "target"])
# Define the window specification
window_spec = Window.orderBy("timestamp").rowsBetween(-4, 0)
# Add 'counter_1' and 'max_1' columns to df2
df2 = df2.withColumn("counter_1", lit(None))
df2 = df2.withColumn("max_1", lit(None))
# Combine the last 4 rows of df1 and all rows of df2
combined_df = df1.orderBy("timestamp", ascending=False).limit(4).union(df2).sort('timestamp')
# Calculate 'counter_1' and 'max_1' for the combined dataframe
combined_df = combined_df.withColumn("counter_1", sum("target").over(window_spec))
combined_df = combined_df.withColumn("max_1",max("target").over(window_spec))
# Replace the 'counter_1' and 'max_1' values in df2 with the calculated values
df2 = df2.drop("counter_1", "max_1")
df2 = df2.join(combined_df, ["timestamp", "target"], how = "left")
# Concatenate df1 and df2
result = df1.union(df2)
result.show(truncate=False)
+----------------------------+------+---------+-----+
|timestamp |target|counter_1|max_1|
+----------------------------+------+---------+-----+
|2023-08-18T00:00:00.000+0000|1 |3 |1 |
|2023-08-18T00:10:00.000+0000|1 |4 |1 |
|2023-08-18T00:20:00.000+0000|1 |3 |1 |
|2023-08-18T00:30:00.000+0000|0 |3 |1 |
|2023-08-18T00:40:00.000+0000|1 |3 |1 |
|2023-08-18T00:50:00.000+0000|0 |3 |1 |
|2023-08-18T01:00:00.000+0000|1 |3 |1 |
|2023-08-18T01:10:00.000+0000|1 |3 |1 |
|2023-08-18T01:30:00.000+0000|1 |4 |1 |
|2023-08-18T01:20:00.000+0000|1 |4 |1 |
|2023-08-18T01:40:00.000+0000|0 |4 |1 |
|2023-08-18T02:00:00.000+0000|0 |3 |1 |
|2023-08-18T01:50:00.000+0000|1 |4 |1 |
+----------------------------+------+---------+-----+
Предоставьте достаточно кода, чтобы другие могли лучше понять или воспроизвести проблему.