Оценить размер срезов nans

У меня есть искровой фрейм данных с некоторыми значениями null в столбце. Мне нужно подсчитать смежные значения null, предшествующие ненулевому значению.

Используя numpy, я бы сделал что-то вроде этого (код не оптимизирован для numpy, потому что я стараюсь не использовать его в своей проблеме):

import numpy as np

x = np.array([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.], [5, 3.], 
              [6, None], [7, None], [8, 5.], [9, 2.], [10, None]])

def nan_count(l, n):
    assert n <= len(l) + 1
    assert n >= 0

    if n < 1 or l[n-1] is not None:
        return 0
    return nan_count(l, n-1) + 1

y = map(lambda i: nan_count(x[:,1], i), x[:,0])
res = np.concatenate([x, np.asarray(y).reshape(-1,1)], axis = 1)
res

Чтобы вывод выглядел так:

Out[31]: [0, 1, 0, 0, 1, 0, 0, 1, 2, 0, 0]

Теперь, если у меня есть искровой DataFrame, например x:

x = sc.parallelize([[0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
                    [5, 3.], [6, None], [7, None], [8, 5.], [9, 2.], [10, None]])\
      .toDF()
x.show()
+---+----+
| _1|  _2|
+---+----+
|  0|null|
|  1| 3.0|
|  2| 7.0|
|  3|null|
|  4| 4.0|
|  5| 3.0|
|  6|null|
|  7|null|
|  8| 5.0|
|  9| 2.0|
| 10|null|
+---+----+

Как получить тот же результат?

Я уже пробовал некоторые обходные пути с использованием udf, но у меня есть проблемы со ссылкой на значение перед выбранным (я пытался использовать методы select и filterpyspark.sql.dataframe.DataFrame внутри udf, но это не разрешено).

РЕДАКТИРОВАТЬ: Я не знаю, сколько последовательных nans я могу найти.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
81
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я добавил комментарии в код, чтобы объяснить каждый шаг, пока не достигну желаемого результата.

Конечно, нет необходимости создавать все столбцы из приведенного ниже примера, и, вероятно, этот код можно значительно улучшить, но я думаю, что важно показать вам шаг за шагом и сделать начальный старт, чтобы решить ваш вопрос.

x = sc.parallelize([
    [0, None],
    [1, 3.],
    [2, 7.],
    [3, None],
    [4, 4.],
    [5, 3.],
    [6, None],
    [7, None],
    [8, 5.],
    [9, 2.],
    [10, None]
])
# Assigned values ​​in columns A and B to facilitate manipulation
x = x.toDF(['A', 'B'])

# Prints initial DF
x.show()

Выход:

+---+----+
|  A|   B|
+---+----+
|  0|null|
|  1| 3.0|
|  2| 7.0|
|  3|null|
|  4| 4.0|
|  5| 3.0|
|  6|null|
|  7|null|
|  8| 5.0|
|  9| 2.0|
| 10|null|
+---+----+
# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1))
x.show()

Выход:

+---+----+----+
|  A|   B|   C|
+---+----+----+
|  0|null|   1|
|  1| 3.0|null|
|  2| 7.0|null|
|  3|null|   1|
|  4| 4.0|null|
|  5| 3.0|null|
|  6|null|   1|
|  7|null|   1|
|  8| 5.0|null|
|  9| 2.0|null|
| 10|null|   1|
+---+----+----+
# Creates a spec that order column A
order_spec = Window().orderBy('A')

# Doing a cumulative sum. See the explanation
# https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
x = x \
    .withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
    .withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
    .drop('tmp')
x.show()

Выход:

+---+----+----+----+
|  A|   B|   C|   D|
+---+----+----+----+
|  0|null|   1|   1|
|  1| 3.0|null|null|
|  2| 7.0|null|null|
|  3|null|   1|   1|
|  4| 4.0|null|null|
|  5| 3.0|null|null|
|  6|null|   1|   1|
|  7|null|   1|   2|
|  8|null|   1|   3|
|  9| 5.0|null|null|
| 10| 2.0|null|null|
| 11|null|   1|   1|
+---+----+----+----+
# Put values from column D to one row above and select the desired output values
x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
    .select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
x.show()

Выход:

+---+----+---------+
|  A|   B|nan_count|
+---+----+---------+
|  0|null|        0|
|  1| 3.0|        1|
|  2| 7.0|        0|
|  3|null|        0|
|  4| 4.0|        1|
|  5| 3.0|        0|
|  6|null|        0|
|  7|null|        1|
|  8|null|        2|
|  9| 5.0|        3|
| 10| 2.0|        0|
| 11|null|        0|
+---+----+---------+

Весь код:

from pyspark.shell import sc
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, sum, col

x = sc.parallelize([
    [0, None], [1, 3.], [2, 7.], [3, None], [4, 4.],
    [5, 3.], [6, None], [7, None], [8, None], [9, 5.], [10, 2.], [11, None]])
x = x.toDF(['A', 'B'])

# Transform null values into "1"
x = x.withColumn('C', when(x.B.isNull(), 1))

# Creates a spec that order column A
order_spec = Window().orderBy('A')

# Doing a cumulative sum with reset condition. See the explanation
# https://stackoverflow.com/questions/56384625/pyspark-cumulative-sum-with-reset-condition
x = x \
    .withColumn('tmp', sum((x.C.isNull()).cast('int')).over(order_spec)) \
    .withColumn('D', sum(x.C).over(order_spec.partitionBy("tmp"))) \
    .drop('tmp')

# Put values from column D to one row above and select the desired output values
x = x.withColumn('E', lag(x.D, ).over(order_spec)) \
    .select(x.A, x.B, when(col('E').isNotNull(), col('E')).otherwise(0).alias('nan_count'))
x.show()

Это частично работает: я забыл сказать, что у меня может быть больше двух последовательных nans: код, который вы отправили, верен до 2 последовательных nans, если их больше, он просто пишет 2. Я думаю, проблема в строке sum_between вашего кода, я прав?

Giacomo Sachs 30.05.2019 16:29

Да, но это был фиктивный набор данных, у меня есть длинные временные ряды с датчика, и бывает, что он пропускает некоторые показания и буферизует значения. У меня могут быть пустые значения в последовательностях длиннее двух, т.е. […,1.5, nan, nan, nan, nan, nan, 6.7, ...]

Giacomo Sachs 30.05.2019 16:56

Другие вопросы по теме