Вычисление количества скользящих чисел из двух разных столбцов временных рядов в pyspark

У меня есть фрейм данных pyspark, содержащий два столбца. Прибытие и отъезд. Идея состоит в том, чтобы вычислить количество событий отправления, попадающих в указанный интервал, рассчитанный на основе времени прибытия. Например, если товар прибыл в 23:00, я хотел бы взять окно продолжительностью -12 часов [11:00, 23:00] и вычислить количество товаров, ушедших в течение этого интервала времени.

Вот мой код для его создания. Но, как вы видите, это не работает, поскольку я могу агрегировать часы либо по столбцу прибытия_времени, либо по столбцу dept_time.

from pyspark.sql.window import Window
spark = SparkSession.builder.appName("rolling_window_example").getOrCreate()

# Sample data
data = [
    ("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
    ("2024-05-12 14:10:00", "2024-05-13 02:00:00", 1, 1),
    ("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
    ("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
    ("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]

columns = ["dept_time", "arrival_time", "ground_truth_12", "ground_truth_24"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("dept_timestamp", col("dept_time").cast("timestamp"))
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp"))
df = df.withColumn("dept_time", col("dept_time").cast("timestamp").cast("long"))
df = df.withColumn("arrival_time", col("arrival_time").cast("timestamp").cast("long"))

# Calculate windows wrt arrival time
window_12 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-12 * 3600, Window.currentRow)
window_24 = Window.partitionBy().orderBy("arrival_time").rangeBetween(-24 * 3600, Window.currentRow)

df_rolling = df \
    .orderBy("dept_time") \
    .withColumn("t_12_count", f.count("dept_time").over(window_12)) \
    .withColumn("t_24_count", f.count("dept_time").over(window_24))
                
# Show results
display(df_rolling)

Результат не такой, как ожидалось:

+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
| dept_time|arrival_time|ground_truth_12|ground_truth_24|     dept_timestamp|  arrival_timestamp|t_12_count|t_24_count|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+
|1715306400|  1715376600|              0|              1|2024-05-10 02:00:00|2024-05-10 21:30:00|         1|         1|
|1715523000|  1715565600|              1|              1|2024-05-12 14:10:00|2024-05-13 02:00:00|         1|         1|
|1714878000|  1715657400|              2|              2|2024-05-05 03:00:00|2024-05-14 03:30:00|         1|         1|
|1715650320|  1715729400|              0|              2|2024-05-14 01:32:00|2024-05-14 23:30:00|         1|         2|
|1715648400|  1715736600|              0|              1|2024-05-14 01:00:00|2024-05-15 01:30:00|         2|         3|
+----------+------------+---------------+---------------+-------------------+-------------------+----------+----------+

Ожидаемый результат можно увидеть в столбцах: ground_truth_12 и ground_truth_24 для 12-часового и 24-часового окна соответственно.

Какого результата вы ожидали? Облегчит понимание того, что вы ищете, если вы также поместите сюда ожидаемый результат.

nihil0 17.05.2024 07:34

ground_truth_12 и ground_truth_24 обозначают результат :)

Muhammad Irfan Ali 17.05.2024 08:08

Почему вы публикуете решение через чатгпт? Опубликуйте свою реальную проблему с данными. Выполните расчеты вручную. И дайте знать, почему этот расчет верен в соответствии с вашими требованиями.

user238607 17.05.2024 11:50

Это не решение чатгпта. Это работаю сам.

Muhammad Irfan Ali 17.05.2024 12:36

@MuhammadIrfanAli: Я опубликовал решение. дайте мне знать, как это сработает для вас. Сколько времени потребуется на выполнение и т.д.

user238607 19.05.2024 07:55

@user238607 user238607 большое спасибо. Я оцениваю решение и скоро прокомментирую.

Muhammad Irfan Ali 20.05.2024 12:48
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
6
117
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вот умный способ выяснить все отправления до прибытия текущих строк.

Отметьте соответствующее время флажком прибытия, например "A" или отправления "D".

Теперь объедините эти два кадра данных.

Упорядочите эти кадры данных по времени независимо от метки.

Создайте спецификацию окна, которая будет подсчитывать все строки "D", встречающиеся в окне от -12 часов до 0 с (текущая строка/время).

Выполняйте описанную выше операцию только для "A" строк, поскольку именно это нас волнует в конечном результате.

Аналогично для окна -24 часа.

Важная заметка :

Во второй строке есть ошибка, которую я исправил ниже.

Ниже приведен рабочий пример.

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession.builder.appName("SelfJoinExample").getOrCreate()

## There is error in the second row grount truth which I have corrected below.

data = [
    ("2024-05-10 02:00:00", "2024-05-10 21:30:00", 0, 1),
    ("2024-05-12 12:00:00", "2024-05-13 02:00:00", 0, 1),
    ("2024-05-05 03:00:00", "2024-05-14 03:30:00", 2, 2),
    ("2024-05-14 01:32:00", "2024-05-14 23:30:00", 0, 2),
    ("2024-05-14 01:00:00", "2024-05-15 01:30:00", 0, 1)
]

columns = ["departure_time", "arrival_time", "ground_truth_12", "ground_truth_24"]

# Create DataFrame
df = spark.createDataFrame(data, columns)
df = df.withColumn("departure_timestamp", col("departure_time").cast("timestamp")).drop("departure_time")
df = df.withColumn("arrival_timestamp", col("arrival_time").cast("timestamp")).drop("arrival_time")
df = df.withColumn("mono_id", monotonically_increasing_id())
df = df.withColumn("arrival_label", array(col("arrival_timestamp"), lit("A"), col("mono_id")))
df = df.withColumn("dept_label", array(col("departure_timestamp"), lit("D"), col("mono_id")))

df_arrival = df.select(col("arrival_label").alias("common_name"))
df_dept = df.select(col("dept_label").alias("common_name"))

df_union = df_arrival.union(df_dept)

df_union = df_union.orderBy(col("common_name")[0])
df_union.show(n=1000, truncate=False)


df_mid = df_union.withColumn("timestamp", to_timestamp(df_union["common_name"][0]))
df_mid = df_mid.withColumn("long_ts", col("timestamp").cast("long"))
df_mid = df_mid.withColumn("type", df_union["common_name"][1])
df_mid = df_mid.withColumn("value", df_union["common_name"][2].cast(LongType()))
df_mid = df_mid.drop("common_name")
df_mid = df_mid.withColumn("dept_flag", when(col("type") == "D", 1).otherwise(0))

df_mid.show(truncate=False)


windowSpec12 = Window.orderBy("long_ts").rangeBetween(-12 * 3600, 0)
windowSpec24 = Window.orderBy("long_ts").rangeBetween(-24 * 3600, 0)



df_int = df_mid.withColumn("calc_t12",  when(col("type") == "A", sum("dept_flag").over(windowSpec12)).otherwise(None))
df_int = df_int.withColumn("calc_t24",  when(col("type") == "A", sum("dept_flag").over(windowSpec24)).otherwise(None))

df_int.show(n=1000, truncate=False)

df_crosscheck = df.join(df_int, on=[col("type") == "A", col("mono_id") == col("value")], how = "inner")

print("Final Result")
df_crosscheck.select("ground_truth_12", "ground_truth_24", "calc_t12", "calc_t24").show(n=1000, truncate=False)

Окончательный фрейм данных перекрестной проверки:

+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0              |1              |0       |1       |
|0              |1              |0       |1       |
|2              |2              |2       |2       |
|0              |2              |0       |2       |
|0              |1              |0       |1       |
+---------------+---------------+--------+--------+

Полный вывод ниже:

+--------------------------------------+
|common_name                           |
+--------------------------------------+
|[2024-05-05 03:00:00, D, 94489280512] |
|[2024-05-10 02:00:00, D, 25769803776] |
|[2024-05-10 21:30:00, A, 25769803776] |
|[2024-05-12 12:00:00, D, 60129542144] |
|[2024-05-13 02:00:00, A, 60129542144] |
|[2024-05-14 01:00:00, D, 163208757248]|
|[2024-05-14 01:32:00, D, 128849018880]|
|[2024-05-14 03:30:00, A, 94489280512] |
|[2024-05-14 23:30:00, A, 128849018880]|
|[2024-05-15 01:30:00, A, 163208757248]|
+--------------------------------------+

+-------------------+----------+----+------------+---------+
|timestamp          |long_ts   |type|value       |dept_flag|
+-------------------+----------+----+------------+---------+
|2024-05-05 03:00:00|1714858200|D   |94489280512 |1        |
|2024-05-10 02:00:00|1715286600|D   |25769803776 |1        |
|2024-05-10 21:30:00|1715356800|A   |25769803776 |0        |
|2024-05-12 12:00:00|1715495400|D   |60129542144 |1        |
|2024-05-13 02:00:00|1715545800|A   |60129542144 |0        |
|2024-05-14 01:00:00|1715628600|D   |163208757248|1        |
|2024-05-14 01:32:00|1715630520|D   |128849018880|1        |
|2024-05-14 03:30:00|1715637600|A   |94489280512 |0        |
|2024-05-14 23:30:00|1715709600|A   |128849018880|0        |
|2024-05-15 01:30:00|1715716800|A   |163208757248|0        |
+-------------------+----------+----+------------+---------+

+-------------------+----------+----+------------+---------+--------+--------+
|timestamp          |long_ts   |type|value       |dept_flag|calc_t12|calc_t24|
+-------------------+----------+----+------------+---------+--------+--------+
|2024-05-05 03:00:00|1714858200|D   |94489280512 |1        |NULL    |NULL    |
|2024-05-10 02:00:00|1715286600|D   |25769803776 |1        |NULL    |NULL    |
|2024-05-10 21:30:00|1715356800|A   |25769803776 |0        |0       |1       |
|2024-05-12 12:00:00|1715495400|D   |60129542144 |1        |NULL    |NULL    |
|2024-05-13 02:00:00|1715545800|A   |60129542144 |0        |0       |1       |
|2024-05-14 01:00:00|1715628600|D   |163208757248|1        |NULL    |NULL    |
|2024-05-14 01:32:00|1715630520|D   |128849018880|1        |NULL    |NULL    |
|2024-05-14 03:30:00|1715637600|A   |94489280512 |0        |2       |2       |
|2024-05-14 23:30:00|1715709600|A   |128849018880|0        |0       |2       |
|2024-05-15 01:30:00|1715716800|A   |163208757248|0        |0       |1       |
+-------------------+----------+----+------------+---------+--------+--------+

+---------------+---------------+--------+--------+
|ground_truth_12|ground_truth_24|calc_t12|calc_t24|
+---------------+---------------+--------+--------+
|0              |1              |0       |1       |
|0              |1              |0       |1       |
|2              |2              |2       |2       |
|0              |2              |0       |2       |
|0              |1              |0       |1       |
+---------------+---------------+--------+--------+

Это хорошо работает для 30 миллионов строк, которые я тестировал. Программа занимает около 2,5-4,5 минут.

Muhammad Irfan Ali 20.05.2024 15:47

Если вам нужно сделать это для разных идентификаторов или групп. вы можете разбить соответствующим образом, и в кластере это будет еще быстрее. Я предполагаю, что вы не указали идентификатор, чтобы упростить проблему.

user238607 20.05.2024 18:37

Другие вопросы по теме