У меня есть фрейм данных pyspark, содержащий два столбца. Прибытие и отъезд. Идея состоит в том, чтобы вычислить количество событий отправления, попадающих в указанный интервал, рассчитанный на основе времени прибытия. Например, если товар прибыл в 23:00, я хотел бы взять окно продолжительностью -12 часов [11:00, 23:00] и вычислить количество товаров, ушедших в течение этого интервала времени.
Вот мой код для его создания. Но, как вы видите, это не работает, поскольку я могу агрегировать часы либо по столбцу прибытия_времени, либо по столбцу dept_time.
from pyspark.sql.window import Window
spark = SparkSession.builder.appName("rolling_window_example").getOrCreate()
# Sample data
data = [
("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
("2024-05-12 14:10:00", "2024-05-13 02:00:00", 1, 1),
("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]
columns = ["dept_time", "arrival_time", "ground_truth_12", "ground_truth_24"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("dept_timestamp", col("dept_time").cast("timestamp"))
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp"))
df = df.withColumn("dept_time", col("dept_time").cast("timestamp").cast("long"))
df = df.withColumn("arrival_time", col("arrival_time").cast("timestamp").cast("long"))
# Calculate windows wrt arrival time
window_12 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-12 * 3600, Window.currentRow)
window_24 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-24 * 3600, Window.currentRow)
df_rolling = df \
.orderBy("dept_time") \
.withColumn("t_12_count", f.count("dept_time").over(window_12)) \
.withColumn("t_24_count", f.count("dept_time").over(window_24))
# Show results
display(df_rolling)
Результат не такой, как ожидалось:
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
| dept_time|arrival_time|ground_truth_12|ground_truth_24| dept_timestamp| arrival_timestamp|t_12_count|t_24_count|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
|1715306400| 1715376600| 0| 1|2024-05-10 02:00:00|2024-05-10 21:30:00| 1| 1|
|1715523000| 1715565600| 1| 1|2024-05-12 14:10:00|2024-05-13 02:00:00| 1| 1|
|1714878000| 1715657400| 2| 2|2024-05-05 03:00:00|2024-05-14 03:30:00| 1| 1|
|1715650320| 1715729400| 0| 2|2024-05-14 01:32:00|2024-05-14 23:30:00| 1| 2|
|1715648400| 1715736600| 0| 1|2024-05-14 01:00:00|2024-05-15 01:30:00| 2| 3|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
Ожидаемый результат можно увидеть в столбцах: ground_truth_12
и ground_truth_24
для 12-часового и 24-часового окна соответственно.
ground_truth_12 и ground_truth_24 обозначают результат :)
Почему вы публикуете решение через чатгпт? Опубликуйте свою реальную проблему с данными. Выполните расчеты вручную. И дайте знать, почему этот расчет верен в соответствии с вашими требованиями.
Это не решение чатгпта. Это работаю сам.
@MuhammadIrfanAli: Я опубликовал решение. дайте мне знать, как это сработает для вас. Сколько времени потребуется на выполнение и т.д.
@user238607 user238607 большое спасибо. Я оцениваю решение и скоро прокомментирую.
Вот умный способ выяснить все отправления до прибытия текущих строк.
Отметьте соответствующее время флажком прибытия, например "A"
или отправления "D"
.
Теперь объедините эти два кадра данных.
Упорядочите эти кадры данных по времени независимо от метки.
Создайте спецификацию окна, которая будет подсчитывать все строки "D"
, встречающиеся в окне от -12 часов до 0 с (текущая строка/время).
Выполняйте описанную выше операцию только для "A"
строк, поскольку именно это нас волнует в конечном результате.
Аналогично для окна -24 часа.
Важная заметка :
Ниже приведен рабочий пример.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
spark = SparkSession.builder.appName("SelfJoinExample").getOrCreate()
## There is error in the second row grount truth which I have corrected below.
data = [
("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
("2024-05-12 12:00:00", "2024-05-13 02:00:00", 0, 1),
("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]
columns = ["departure_time", "arrival_time", "ground_truth_12", "ground_truth_24"]
# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("departure_timestamp", col("departure_time").cast("timestamp")).drop("departure_time")
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp")).drop("arrival_time")
df = df.withColumn("mono_id", monotonically_increasing_id())
df = df.withColumn("arrival_label", array(col("arrival_timestamp"), lit("A"), col("mono_id")))
df = df.withColumn("dept_label", array(col("departure_timestamp"), lit("D"), col("mono_id")))
df_arrival = df.select(col("arrival_label").alias("common_name"))
df_dept = df.select(col("dept_label").alias("common_name"))
df_union = df_arrival.union(df_dept)
df_union = df_union.orderBy(col("common_name")[0])
df_union.show(n=1000, truncate=False)
df_mid = df_union.withColumn("timestamp", to_timestamp(df_union["common_name"][0]))
df_mid = df_mid.withColumn("long_ts", col("timestamp").cast("long"))
df_mid = df_mid.withColumn("type", df_union["common_name"][1])
df_mid = df_mid.withColumn("value", df_union["common_name"][2].cast(LongType()))
df_mid = df_mid.drop("common_name")
df_mid = df_mid.withColumn("dept_flag", when(col("type") == "D", 1).otherwise(0))
df_mid.show(truncate=False)
windowSpec12 = Window.orderBy("long_ts").rangeBetween(-12 * 3600, 0)
windowSpec24 = Window.orderBy("long_ts").rangeBetween(-24 * 3600, 0)
df_int = df_mid.withColumn("calc_t12", when(col("type") == "A", sum("dept_flag").over(windowSpec12)).otherwise(None))
df_int = df_int.withColumn("calc_t24", when(col("type") == "A", sum("dept_flag").over(windowSpec24)).otherwise(None))
df_int.show(n=1000, truncate=False)
df_crosscheck = df.join(df_int, on=[col("type") == "A", col("mono_id") == col("value")], how = "inner")
print("Final Result")
df_crosscheck.select("ground_truth_12", "ground_truth_24", "calc_t12", "calc_t24").show(n=1000, truncate=False)
Окончательный фрейм данных перекрестной проверки:
+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0 |1 |0 |1 |
|0 |1 |0 |1 |
|2 |2 |2 |2 |
|0 |2 |0 |2 |
|0 |1 |0 |1 |
+---------------+---------------+--------+--------+
Полный вывод ниже:
+--------------------------------------+
|common_name |
+--------------------------------------+
|[2024-05-05 03:00:00, D, 94489280512] |
|[2024-05-10 02:00:00, D, 25769803776] |
|[2024-05-10 21:30:00, A, 25769803776] |
|[2024-05-12 12:00:00, D, 60129542144] |
|[2024-05-13 02:00:00, A, 60129542144] |
|[2024-05-14 01:00:00, D, 163208757248]|
|[2024-05-14 01:32:00, D, 128849018880]|
|[2024-05-14 03:30:00, A, 94489280512] |
|[2024-05-14 23:30:00, A, 128849018880]|
|[2024-05-15 01:30:00, A, 163208757248]|
+--------------------------------------+
+-------------------+----------+----+------------+---------+
|timestamp |long_ts |type|value |dept_flag|
+-------------------+----------+----+------------+---------+
|2024-05-05 03:00:00|1714858200|D |94489280512 |1 |
|2024-05-10 02:00:00|1715286600|D |25769803776 |1 |
|2024-05-10 21:30:00|1715356800|A |25769803776 |0 |
|2024-05-12 12:00:00|1715495400|D |60129542144 |1 |
|2024-05-13 02:00:00|1715545800|A |60129542144 |0 |
|2024-05-14 01:00:00|1715628600|D |163208757248|1 |
|2024-05-14 01:32:00|1715630520|D |128849018880|1 |
|2024-05-14 03:30:00|1715637600|A |94489280512 |0 |
|2024-05-14 23:30:00|1715709600|A |128849018880|0 |
|2024-05-15 01:30:00|1715716800|A |163208757248|0 |
+-------------------+----------+----+------------+---------+
+-------------------+----------+----+------------+---------+--------+--------+
|timestamp |long_ts |type|value |dept_flag|calc_t12|calc_t24|
+-------------------+----------+----+------------+---------+--------+--------+
|2024-05-05 03:00:00|1714858200|D |94489280512 |1 |NULL |NULL |
|2024-05-10 02:00:00|1715286600|D |25769803776 |1 |NULL |NULL |
|2024-05-10 21:30:00|1715356800|A |25769803776 |0 |0 |1 |
|2024-05-12 12:00:00|1715495400|D |60129542144 |1 |NULL |NULL |
|2024-05-13 02:00:00|1715545800|A |60129542144 |0 |0 |1 |
|2024-05-14 01:00:00|1715628600|D |163208757248|1 |NULL |NULL |
|2024-05-14 01:32:00|1715630520|D |128849018880|1 |NULL |NULL |
|2024-05-14 03:30:00|1715637600|A |94489280512 |0 |2 |2 |
|2024-05-14 23:30:00|1715709600|A |128849018880|0 |0 |2 |
|2024-05-15 01:30:00|1715716800|A |163208757248|0 |0 |1 |
+-------------------+----------+----+------------+---------+--------+--------+
+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0 |1 |0 |1 |
|0 |1 |0 |1 |
|2 |2 |2 |2 |
|0 |2 |0 |2 |
|0 |1 |0 |1 |
+---------------+---------------+--------+--------+
Это хорошо работает для 30 миллионов строк, которые я тестировал. Программа занимает около 2,5-4,5 минут.
Если вам нужно сделать это для разных идентификаторов или групп. вы можете разбить соответствующим образом, и в кластере это будет еще быстрее. Я предполагаю, что вы не указали идентификатор, чтобы упростить проблему.
Какого результата вы ожидали? Облегчит понимание того, что вы ищете, если вы также поместите сюда ожидаемый результат.