Я использую Spark 3.0.1, и это пример моего DataFrame pyspark:
| label| amount| bool |
-----------------------------
| a | 10 | false |
| a | 2 | false |
| b | 20 | true |
| c | 3 | true |
| d | 2 | false |
| f | 5 | false |
| w | 50 | true |
...
...
Это код, который я использовал для создания вышеупомянутого примера:
df = spark.createDataFrame(pd.DataFrame({
'label': ["a", "a", "b", "c", "d", "f", "w"],
'amount': [10, 2, 20, 3, 2, 5, 50],
'bool': [False, False, True, True, False, False, True]
}))
Я хотел бы выполнить задачу, которая кажется мне очень простой, но которую я не могу выполнить.
В частности, я хотел бы:
label
(уже предполагается в примере)true_label
значения как таковые:
label
, если bool
равно false
label
) label
уже встречался с false
в bool
Обновление предыдущего примера должно помочь лучше понять ожидаемый результат:
| label| amount| bool | real_label |
-----------------------------------
| a | 10 | false | a | <- because `bool` is false, `real_label` = `label`
| a | 2 | false | a | <- because `bool` is false, `real_label` = `label`
| b | 20 | true | a | <- because `a` the latest `label` with a `false` in `bool`
| c | 3 | true | a | <- because `a` the latest `label` with a `false` in `bool`
| d | 2 | false | d | <- because `bool` is false, `real_label` = `label`
| f | 5 | false | f | <- because `bool` is false, `real_label` = `label`
| w | 50 | true | f | <- because `f` the latest `label` with a `false` in `bool`
...
...
Можно ли достичь того, чего я хочу, не зная количества последовательных false
, которые я могу встретить, а также учитывая, что реальный фрейм данных очень большой и имеет значение производительность (поэтому ответы на основе toPandas, к сожалению, сомнительны, и также было бы лучше избегать udf
функций) ?
Используйте оконную функцию last
, чтобы получить предыдущую «ложную метку», если она верна, в противном случае сохраните метку.
from pyspark.sql import functions as F, Window
df2 = df.withColumn(
'real_label',
F.when(
F.col('bool'), # get previous false label if true
F.last(
F.when(~F.col('bool'), F.col('label')), # keep false labels and mask true labels with null
ignorenulls=True
).over(Window.orderBy('label'))
).otherwise(F.col('label')) # otherwise keep label if false
)
df2.show()
+-----+------+-----+----------+
|label|amount| bool|real_label|
+-----+------+-----+----------+
| a| 10|false| a|
| a| 2|false| a|
| b| 20| true| a|
| c| 3| true| a|
| d| 2|false| d|
| f| 5|false| f|
| w| 50| true| f|
+-----+------+-----+----------+
yourDF.createOrReplaceTempView("tmp_view")
yourTransformedDF = spark.sql("""SELECT
label,
amount,
bool,
label2,
CASE WHEN bool THEN LAG(COALESCE(label2, label)) OVER (ORDER BY label)
ELSE label
END AS real_label
FROM (
SELECT
label,
amount,
bool,
case when bool then LAG(label) OVER (ORDER BY label) else label end as label2
FROM tmp_view) q""")
Потрясающе ясно, сэр. Именно этого я и хотел добиться. Простой и производительный. Последняя функция не бросилась мне в глаза, и мне бы не пришло в голову использовать ее таким элегантным способом. Спасибо.