Вычислите разницу во времени в соответствии с условием и для больших данных с помощью Pyspark

помогите пожалуйста... У меня есть такие данные:

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import substring, length
dept = [("A",1,"2020-11-07 23:19:12"), ("A",1,"2020-11-07 23:19:16"), ("A",1,"2020-11-07 23:19:56"), ("A",0,"2020-11-07 23:20:37"), ("A",0,"2020-11-07 23:21:06"), ("A",0,"2020-11-07 23:21:47"), ("A",1,"2020-11-07 23:22:05"), ("A",1,"2020-11-07 23:22:30"),("A",1,"2020-11-07 23:23:00"), ("B",1,"2020-11-07 22:19:12"), ("B",1,"2020-11-07 22:20:10"), ("B",0,"2020-11-07 22:21:31"), ("B",0,"2020-11-07 22:22:01"), ("B",0,"2020-11-07 22:22:45"), ("B",1,"2020-11-07 22:23:52"), ("B",1,"2020-11-07 22:24:10")]
deptColumns = ["Id","BAP","Time"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()

  1. С Pyspark, как получить время для первого 0 из первой серии нулей для каждого идентификатора и получить время первой 1 сразу после той же серии нулей. Затем сделайте вычитание time_stamp между ними. Что-то вроде этого:

Это необходимо сделать для каждой серии нулей, составляющих каждый идентификатор. Таким образом, мы можем иметь несколько DeltaTime для одного и того же ID, если есть несколько серий нулей.

На самом деле, я могу вычислить дельту времени между последовательными строками:

Delta=deptDF.withColumn("DeltaTime",(deptDF.Time.cast("bigint") - lag(deptDF.Time.cast("bigint"),1).over(Window.partitionBy("Id").orderBy("Time")).cast("bigint")))
Delta.show()

можно ли добавить любое условие, чтобы получить ожидаемый результат?

Добро пожаловать в СО. Пожалуйста, покажите свои усилия. SO не является системой разгрузки работы.

ShlomiF 09.12.2020 23:32

Сообщение отредактировано.. проверьте новую версию

Data Tao 09.12.2020 23:44
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
107
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Добавьте два столбца begin0 и begin1, чтобы упростить анализ данных с помощью оконных функций:

import pyspark.sql.functions as F

window = Window.partitionBy('Id').orderBy('Time')
Delta = deptDF.withColumn(
    'begin0',
    (F.lag('BAP').over(window) != 0) & (F.col('BAP') == 0)
).withColumn(
    'begin1',
    (F.lag('BAP').over(window) == 0) & (F.col('BAP') == 1)
).filter(
    'begin0 or begin1'
).withColumn(
    'DeltaTime',
    F.when(
        F.col('BAP') == 0,
        F.date_format(
            (
                F.lead('Time').over(window).cast('timestamp').cast('bigint') -
                F.col('Time').cast('timestamp').cast('bigint')
            ).cast('timestamp'),
           'HH:mm:ss'
       )
    ).otherwise(
        F.lit('00:00:00')
    )
).drop(
    'begin0', 'begin1'
).orderBy(
    'Id','Time'
)

Delta.show()
+---+---+-------------------+---------+
| Id|BAP|               Time|DeltaTime|
+---+---+-------------------+---------+
|  A|  0|2020-11-07 23:20:37| 00:01:28|
|  A|  1|2020-11-07 23:22:05| 00:00:00|
|  B|  0|2020-11-07 22:21:31| 00:02:21|
|  B|  1|2020-11-07 22:23:52| 00:00:00|
+---+---+-------------------+---------+

Другие вопросы по теме