Как интерпретировать pyspark .explain() и как pyspark упорядочивает операции

Я возился с некоторым кодом pyspark и заметил поведение, которое я не понимаю и не ожидаю. Я сделал некоторый код, который генерирует случайные данные, двоичные числа в другом столбце и фильтрует случайно сгенерированные данные. Однако, когда я выполняю .show() несколько раз, я часто вижу данные, которые не соответствуют моему условию. Кажется, что моя функция случайных сгенерированных данных вызывается два раза; один раз до фильтра и один раз после фильтра. Я пытался использовать .explain(), но не понимаю, как его читать или интерпретировать. Я также смущен тем, в каком порядке я должен читать вывод .explain()

Может ли кто-нибудь помочь мне понять с помощью .explain() простой код, который я сделал, и который работает совсем не так, как я ожидал?

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, udf
import random
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()


_random_udf = udf(lambda x: int(random.randint(0, 1)), IntegerType())

inputDf = spark.createDataFrame([{'row': i} for i in range(10)])
random_result = inputDf.withColumn("status", _random_udf(col("row")))
non_zero_filter = random_result.filter(col('status') != 0)

non_zero_filter.show()

пример вывода с использованием .show(). Я не ожидал увидеть строки с 0s в столбце status.

ряд положение дел 2 1 3 0 5 0 6 1 7 0

Вывод .explain() следующий

non_zero_filter.explain()

== Physical Plan ==
== Physical Plan ==
*(3) Project [row#296L, pythonUDF0#314 AS status#299]
+- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#314]
   +- *(2) Project [row#296L]
      +- *(2) Filter NOT (pythonUDF0#313 = 0)
         +- BatchEvalPython [<lambda>(row#296L)], [pythonUDF0#313]
            +- *(1) Scan ExistingRDD[row#296L]
Шаблоны Angular PrimeNg
Шаблоны Angular PrimeNg
Как привнести проверку типов в наши шаблоны Angular, использующие компоненты библиотеки PrimeNg, и настроить их отображение с помощью встроенной...
Создайте ползком, похожим на звездные войны, с помощью CSS и Javascript
Создайте ползком, похожим на звездные войны, с помощью CSS и Javascript
Если вы веб-разработчик (или хотите им стать), то вы наверняка гик и вам нравятся "Звездные войны". А как бы вы хотели, чтобы фоном для вашего...
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
Начала с розового дизайна
Начала с розового дизайна
Pink Design - это система дизайна Appwrite с открытым исходным кодом для создания последовательных и многократно используемых пользовательских...
Шлюз в PHP
Шлюз в PHP
API-шлюз (AG) - это сервер, который действует как единая точка входа для набора микросервисов.
14 Задание: Типы данных и структуры данных Python для DevOps
14 Задание: Типы данных и структуры данных Python для DevOps
проверить тип данных используемой переменной, мы можем просто написать: your_variable=100
2
0
66
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Изменение результата связано с тем, что вы генерируете случайные целые числа с помощью модуля Return random.randint. Результатом каждого запуска является новый random_result фрейм данных.

Давайте посмотрим на план. Вы напечатали .explain(), что означает, что вы не просматривали расширенный план. Вы просили только physical plan.Печать .explain(True). даст вам подробный план.

Физический план указывает, как логический план Sparks будет выполняться в кластере. Под df находится RDD. Проще говоря, код pyspark компилируется в RDD.

Так что в этом случае, чтобы отфильтровать нужные вам строки, spark сканирует RDD. Затем он применяет фильтр и проекты. Таким образом, в этом случае есть три работы. Это обозначено номером в скобках со звездочкой. Вы можете просмотреть сведения о каждом задании, взглянув на пользовательский интерфейс искры.

filter поведение см. в комментарии @Emma ниже

+1. UDF OP возвращает недетерминированный результат, поэтому ему необходимо вызвать asNondeterministic func, чтобы катализатор Spark мог правильно оптимизировать. Ссылка: раздел заметок здесь: spark.apache.org/docs/3.1.3/api/python/reference/api/…

Emma 21.11.2022 23:09

только что сделал. Кстати, where также не должен гарантировать работу. та же проблема, что и filter для недетерминированного возвращаемого значения.

Emma 21.11.2022 23:20

Другие вопросы по теме