У меня есть искровой фрейм данных с некоторыми значениями null
в столбце. Мне нужно подсчитать смежные значения null
, предшествующие ненулевому значению.
Используя numpy
, я бы сделал что-то вроде этого (код не оптимизирован для numpy, потому что я стараюсь не использовать его в своей проблеме):
import numpy as np
x = np.array([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.], [5, 3.],
[6, None], [7, None], [8, 5.], [9, 2.], [10, None]])
def nan_count(l, n):
assert n <= len(l) + 1
assert n >= 0
if n < 1 or l[n-1] is not None:
return 0
return nan_count(l, n-1) + 1
y = map(lambda i: nan_count(x[:,1], i), x[:,0])
res = np.concatenate([x, np.asarray(y).reshape(-1,1)], axis = 1)
res
Чтобы вывод выглядел так:
Out[31]: [0, 1, 0, 0, 1, 0, 0, 1, 2, 0, 0]
Теперь, если у меня есть искровой DataFrame, например x
:
x = sc.parallelize([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
[5, 3.], [6, None], [7, None], [8, 5.], [9, 2.], [10, None]])\
.toDF()
x.show()
+---+----+
| _1| _2|
+---+----+
| 0|null|
| 1| 3.0|
| 2| 7.0|
| 3|null|
| 4| 4.0|
| 5| 3.0|
| 6|null|
| 7|null|
| 8| 5.0|
| 9| 2.0|
| 10|null|
+---+----+
Как получить тот же результат?
Я уже пробовал некоторые обходные пути с использованием udf
, но у меня есть проблемы со ссылкой на значение перед выбранным (я пытался использовать методы select
и filter
pyspark.sql.dataframe.DataFrame
внутри udf, но это не разрешено).
РЕДАКТИРОВАТЬ: Я не знаю, сколько последовательных nans
я могу найти.
Я добавил комментарии в код, чтобы объяснить каждый шаг, пока не достигну желаемого результата.
Конечно, нет необходимости создавать все столбцы из приведенного ниже примера, и, вероятно, этот код можно значительно улучшить, но я думаю, что важно показать вам шаг за шагом и сделать начальный старт, чтобы решить ваш вопрос.
x = sc.parallelize([
[0, None],
[1, 3.],
[2, 7.],
[3, None],
[4, 4.],
[5, 3.],
[6, None],
[7, None],
[8, 5.],
[9, 2.],
[10, None]
])
# Assigned values in columns A and B to facilitate manipulation
x = x.toDF(['A', 'B'])
# Prints initial DF
x.show()
Выход:
+---+----+
| A| B|
+---+----+
| 0|null|
| 1| 3.0|
| 2| 7.0|
| 3|null|
| 4| 4.0|
| 5| 3.0|
| 6|null|
| 7|null|
| 8| 5.0|
| 9| 2.0|
| 10|null|
+---+----+
# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1))
x.show()
Выход:
+---+----+----+
| A| B| C|
+---+----+----+
| 0|null| 1|
| 1| 3.0|null|
| 2| 7.0|null|
| 3|null| 1|
| 4| 4.0|null|
| 5| 3.0|null|
| 6|null| 1|
| 7|null| 1|
| 8| 5.0|null|
| 9| 2.0|null|
| 10|null| 1|
+---+----+----+
# Creates a spec that order column A
order_spec = Window().orderBy('A')
# Doing a cumulative sum. See the explanation
# https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
x = x \
.withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
.withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
.drop('tmp')
x.show()
Выход:
+---+----+----+----+
| A| B| C| D|
+---+----+----+----+
| 0|null| 1| 1|
| 1| 3.0|null|null|
| 2| 7.0|null|null|
| 3|null| 1| 1|
| 4| 4.0|null|null|
| 5| 3.0|null|null|
| 6|null| 1| 1|
| 7|null| 1| 2|
| 8|null| 1| 3|
| 9| 5.0|null|null|
| 10| 2.0|null|null|
| 11|null| 1| 1|
+---+----+----+----+
# Put values from column D to one row above and select the desired output values
x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
.select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
x.show()
Выход:
+---+----+---------+
| A| B|nan_count|
+---+----+---------+
| 0|null| 0|
| 1| 3.0| 1|
| 2| 7.0| 0|
| 3|null| 0|
| 4| 4.0| 1|
| 5| 3.0| 0|
| 6|null| 0|
| 7|null| 1|
| 8|null| 2|
| 9| 5.0| 3|
| 10| 2.0| 0|
| 11|null| 0|
+---+----+---------+
Весь код:
from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum, col
x = sc.parallelize([
[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
[5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])
# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1))
# Creates a spec that order column A
order_spec = Window().orderBy('A')
# Doing a cumulative sum with reset condition. See the explanation
# https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
x = x \
.withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
.withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
.drop('tmp')
# Put values from column D to one row above and select the desired output values
x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
.select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
x.show()
Да, но это был фиктивный набор данных, у меня есть длинные временные ряды с датчика, и бывает, что он пропускает некоторые показания и буферизует значения. У меня могут быть пустые значения в последовательностях длиннее двух, т.е. […,1.5, nan, nan, nan, nan, nan, 6.7, ...]
Это частично работает: я забыл сказать, что у меня может быть больше двух последовательных
nans
: код, который вы отправили, верен до 2 последовательныхnans
, если их больше, он просто пишет2
. Я думаю, проблема в строкеsum_between
вашего кода, я прав?