Я возился с некоторым кодом pyspark и заметил поведение, которое я не понимаю и не ожидаю. Я сделал некоторый код, который генерирует случайные данные, двоичные числа в другом столбце и фильтрует случайно сгенерированные данные. Однако, когда я выполняю .show() несколько раз, я часто вижу данные, которые не соответствуют моему условию. Кажется, что моя функция случайных сгенерированных данных вызывается два раза; один раз до фильтра и один раз после фильтра. Я пытался использовать .explain(), но не понимаю, как его читать или интерпретировать. Я также смущен тем, в каком порядке я должен читать вывод .explain()
Может ли кто-нибудь помочь мне понять с помощью .explain() простой код, который я сделал, и который работает совсем не так, как я ожидал?
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())
inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)
non_zero_filter.show()
пример вывода с использованием .show(). Я не ожидал увидеть строки с 0s
в столбце status
.
Вывод .explain() следующий
non_zero_filter.explain()
== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
+- *(2) Project [row#296L]
+- *(2) Filter NOT (pythonUDF0#313 = 0)
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
+- *(1) Scan ExistingRDD[row#296L]
Изменение результата связано с тем, что вы генерируете случайные целые числа с помощью модуля Return random.randint
. Результатом каждого запуска является новый random_result
фрейм данных.
Давайте посмотрим на план. Вы напечатали .explain()
, что означает, что вы не просматривали расширенный план. Вы просили только physical plan
.Печать .explain(True)
. даст вам подробный план.
Физический план указывает, как логический план Sparks будет выполняться в кластере. Под df находится RDD. Проще говоря, код pyspark компилируется в RDD.
Так что в этом случае, чтобы отфильтровать нужные вам строки, spark сканирует RDD. Затем он применяет фильтр и проекты. Таким образом, в этом случае есть три работы. Это обозначено номером в скобках со звездочкой. Вы можете просмотреть сведения о каждом задании, взглянув на пользовательский интерфейс искры.
filter
поведение см. в комментарии @Emma ниже
только что сделал. Кстати, where
также не должен гарантировать работу. та же проблема, что и filter
для недетерминированного возвращаемого значения.
+1. UDF OP возвращает недетерминированный результат, поэтому ему необходимо вызвать
asNondeterministic
func, чтобы катализатор Spark мог правильно оптимизировать. Ссылка: раздел заметок здесь: spark.apache.org/docs/3.1.3/api/python/reference/api/…