помогите пожалуйста... У меня есть такие данные:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
from pyspark.sql.functions import substring, length
dept = [("A",1,"2020-11-07 23:19:12"), ("A",1,"2020-11-07 23:19:16"), ("A",1,"2020-11-07 23:19:56"), ("A",0,"2020-11-07 23:20:37"), ("A",0,"2020-11-07 23:21:06"), ("A",0,"2020-11-07 23:21:47"), ("A",1,"2020-11-07 23:22:05"), ("A",1,"2020-11-07 23:22:30"),("A",1,"2020-11-07 23:23:00"), ("B",1,"2020-11-07 22:19:12"), ("B",1,"2020-11-07 22:20:10"), ("B",0,"2020-11-07 22:21:31"), ("B",0,"2020-11-07 22:22:01"), ("B",0,"2020-11-07 22:22:45"), ("B",1,"2020-11-07 22:23:52"), ("B",1,"2020-11-07 22:24:10")]
deptColumns = ["Id","BAP","Time"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)
deptDF.show()
Это необходимо сделать для каждой серии нулей, составляющих каждый идентификатор. Таким образом, мы можем иметь несколько DeltaTime для одного и того же ID, если есть несколько серий нулей.
На самом деле, я могу вычислить дельту времени между последовательными строками:
Delta=deptDF.withColumn("DeltaTime",(deptDF.Time.cast("bigint") - lag(deptDF.Time.cast("bigint"),1).over(Window.partitionBy("Id").orderBy("Time")).cast("bigint")))
Delta.show()
можно ли добавить любое условие, чтобы получить ожидаемый результат?
Сообщение отредактировано.. проверьте новую версию
Добавьте два столбца begin0 и begin1, чтобы упростить анализ данных с помощью оконных функций:
import pyspark.sql.functions as F
window = Window.partitionBy('Id').orderBy('Time')
Delta = deptDF.withColumn(
'begin0',
(F.lag('BAP').over(window) != 0) & (F.col('BAP') == 0)
).withColumn(
'begin1',
(F.lag('BAP').over(window) == 0) & (F.col('BAP') == 1)
).filter(
'begin0 or begin1'
).withColumn(
'DeltaTime',
F.when(
F.col('BAP') == 0,
F.date_format(
(
F.lead('Time').over(window).cast('timestamp').cast('bigint') -
F.col('Time').cast('timestamp').cast('bigint')
).cast('timestamp'),
'HH:mm:ss'
)
).otherwise(
F.lit('00:00:00')
)
).drop(
'begin0', 'begin1'
).orderBy(
'Id','Time'
)
Delta.show()
+---+---+-------------------+---------+
| Id|BAP| Time|DeltaTime|
+---+---+-------------------+---------+
| A| 0|2020-11-07 23:20:37| 00:01:28|
| A| 1|2020-11-07 23:22:05| 00:00:00|
| B| 0|2020-11-07 22:21:31| 00:02:21|
| B| 1|2020-11-07 22:23:52| 00:00:00|
+---+---+-------------------+---------+
Добро пожаловать в СО. Пожалуйста, покажите свои усилия. SO не является системой разгрузки работы.