Преобразования, связанные с производительностью PySpark, и последовательное переназначение

Я новичок в PySpark. Я понимаю, что он использует отложенную оценку, а это означает, что выполнение серии преобразований будет отложено до тех пор, пока не будет запрошено какое-либо действие, после чего движок Spark оптимизирует весь набор преобразований, а затем выполнит их.

Поэтому как с функциональной точки зрения, так и с точки зрения производительности я ожидаю следующего:

Подход А

df = spark.read.parquet(path) 
df = df.filter(F.col('state') == 'CA')
df = df.select('id', 'name', 'subject')
df = df.groupBy('subject').count()

быть таким же, как это:

Подход Б

df = spark.read.parquet(path)\
    .filter(F.col('state') == 'CA')\
    .select('id', 'name', 'subject')\
    .groupBy('subject').count()

Мне нравится стиль подхода А, потому что он позволяет мне разбить мою логику на более мелкие (и повторно используемые) функции.

Однако я наткнулся на несколько сообщений в блоге (например, здесь , здесь и здесь), которые меня смутили. Эти сообщения посвящены использованию последовательных операторов withColumn и предлагают вместо этого использовать одиночный select. Основная причина, по-видимому, заключается в том, что, поскольку фреймы данных неизменяемы, последовательное использование withColumn вредно для производительности.

Итак, мой вопрос: возникнет ли у меня такая же проблема с производительностью при использовании подхода А? Или это просто проблема, специфичная для withColumn?

Обновленный ответ, то же самое. Попробуй это.

thebluephantom 29.03.2024 15:22

Строго говоря, я ответил на вопрос. На самом деле у вас есть два вопроса под разными углами зрения.

thebluephantom 29.03.2024 17:35

@thebluephantom извини, я не понял: «На самом деле у тебя есть два вопроса, которые под разными углами зрения». Что вы подразумеваете под «расходящимися углами»? Могу ли я предоставить дополнительную информацию, которая поможет прояснить вопрос?

teejay 29.03.2024 18:17

Ничего страшного. Я подробнее рассмотрю этот вопрос позже.

thebluephantom 29.03.2024 18:23

Добавил в ответ. Второй ответ на самом деле,

thebluephantom 30.03.2024 09:46
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
5
454
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Подход А:

  • При таком подходе каждая строка создает новый DataFrame, а предыдущая DataFrame заменяется на новый.
  • Это может привести к проблемам с производительностью, особенно с большими наборами данных. из-за накладных расходов на создание и управление DataFrame
  • Последовательные операторы withColumn могут привести к проблемам с производительностью в PySpark из-за неизменяемости DataFrames.

Подход Б:

  • Вместо использования последовательных операторов withColumn вы можете использовать один оператор выбора для достижения того же результата.

  • Он выполняет преобразования за один проход через данные.

  • Каждая операция в цепочке применяется непосредственно к DataFrame
    возвращается предыдущей операцией без создания промежуточного
    DataFrames.

Подход B обычно предпочтительнее из-за его большей производительности и более чистого и лаконичного кода.

Производительность равная.

thebluephantom 29.03.2024 15:26
Ответ принят как подходящий

Собственно есть 2 вопроса.

Первый вопрос с образцами кодов. В отличие от первого ответа, я не согласен.

Попробуйте оба сегмента кода со знаком .explain(), и вы увидите, что сгенерированный физический план выполнения абсолютно одинаков.

Spark основан на lazy evaluation. То есть:

Все преобразования в Spark являются ленивыми, поскольку они не вычисляют их результаты сразу. Вместо этого они просто помнят преобразования, примененные к некоторому базовому набору данных (например, файлу). преобразования вычисляются только тогда, когда действие требует результата для вернуться в программу драйвера. Эта конструкция позволяет Spark работать более эффективно. Например, мы можем понять, что набор данных, созданный Through Map будет использоваться при сокращении и возвращать только результат сократить до драйвера, а не до большего сопоставленного набора данных.

В результате всего этого я запустил код, аналогичный вашему, с двумя примененными фильтрами, и обратите внимание, что, поскольку действие .count вызывает своевременную оценку, Catalyst отфильтровывает данные на основе как первого, так и второго фильтра. Это известно как "Code Fusing" — что можно сделать для позднего выполнения, т. е. ленивой оценки.

Фрагмент 1 и физический план

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)

df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.filter(col('lastname') == 'Jones2')
df = df.groupBy('lastname').count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#212], functions=[finalmerge_count(merge count#233L) AS count(1)#228L])
   +- Exchange hashpartitioning(lastname#212, 200), ENSURE_REQUIREMENTS, [plan_id=391]
      +- HashAggregate(keys=[lastname#212], functions=[partial_count(1) AS count#233L])
         +- Project [lastname#212]
            +- Filter (isnotnull(lastname#212) AND ((lastname#212 = Jones) AND (lastname#212 = Jones2)))
               +- Scan ExistingRDD[firstname#210,middlename#211,lastname#212,id#213,gender#214,salary#215]

Фрагмент 2 и тот же физический план

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data2 = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema2 = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df2 = spark.createDataFrame(data=data2,schema=schema2)

df2 = df2.filter(col('lastname') == 'Jones')\
       .select('firstname', 'lastname', 'salary')\
       .filter(col('lastname') == 'Jones2')\
       .groupBy('lastname').count().explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#299], functions=[finalmerge_count(merge count#320L) AS count(1)#315L])
   +- Exchange hashpartitioning(lastname#299, 200), ENSURE_REQUIREMENTS, [plan_id=577]
      +- HashAggregate(keys=[lastname#299], functions=[partial_count(1) AS count#320L])
         +- Project [lastname#299]
            +- Filter (isnotnull(lastname#299) AND ((lastname#299 = Jones) AND (lastname#299 = Jones2)))
               +- Scan ExistingRDD[firstname#297,middlename#298,lastname#299,id#300,gender#301,salary#302]

Второй вопрос - withColumn

Делая это:

df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.withColumn("salary100",col("salary")*100) 
df = df.withColumn("salary200",col("salary")*200).explain()

или через цепочку, дает тот же результат. Т.е. не имеет значения, как вы напишете преобразования. Окончательный физический план – это то, что имеет значение, но эта оптимизация требует дополнительных затрат. Зависит от того, как вы об этом думаете, select — это альтернатива.

== Physical Plan ==
*(1) Project [firstname#399, lastname#401, salary#404, (salary#404 * 100) AS salary100#414, (salary#404 * 200) AS salary200#419]
+- *(1) Filter (isnotnull(lastname#401) AND (lastname#401 = Jones))
   +- *(1) Scan ExistingRDD[firstname#399,middlename#400,lastname#401,id#402,gender#403,salary#404]

Эта статья https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015#:~:text=Summary,number%20of%20transforms%20on%20DataFrame%20 занимает другой взгляд на стоимость оптимизации в отличие от проблемы «прогноза». Я был удивлен, что 3 withColumns вызовет какие-либо проблемы. Если это проблема, то это указывает на дерьмовую оптимизацию. И я подозреваю, что реальная проблема заключается в зацикливании многих функций, а не небольшого количества withColumns.

спасибо за объяснение и дополнительную информацию. Ваш ответ - это то, чего я ожидал, исходя из моего понимания того, что такое ленивая оценка. А как насчет конкретного случая использования withColumn? Действительно ли последовательные операторы withColumn имеют худшую производительность, чем один оператор select?

teejay 29.03.2024 14:58

Спасибо за дополнительную информацию по поводу. withColumn. Итак, похоже, что tl;dr: 1) цепочка или последовательное переназначение не имеют значения; 2) для преобразований в нескольких столбцах выполнение их по отдельности с использованием withColumn влечет за собой небольшие дополнительные накладные расходы на оптимизацию, хотя вам придется сделать много таких преобразований, чтобы это имело существенный эффект; 3) два понятия №1 и №2 ортогональны (я думаю, именно это вы имели в виду в своем комментарии о «расходящихся углах», теперь я понимаю).

teejay 30.03.2024 18:02

@teejay, ты понял. Spark также предстоит внести некоторые улучшения. объяснения трудно читать, как и вывод пользовательского интерфейса. Успех

thebluephantom 30.03.2024 18:03

Я предполагаю, что у вас есть следующие вопросы:

  1. Разница в производительности между отсутствием объединения операций (подход A в вашем вопросе) и объединением операций (подход B в вашем вопросе)?
  2. Разница в производительности между несколькими операторами withColumn() и одним оператором select()?

Теперь ответить на второй вопрос довольно просто. Как отмечено в справочнике по искровому API для withColumn():

Этот метод вводит внутреннюю проекцию. Следовательно, вызов его несколько раз, например, через циклы для добавления нескольких столбцов, может привести к созданию больших планов, которые могут вызвать проблемы с производительностью и даже исключение StackOverflowException. Чтобы избежать этого, используйте select() с несколькими столбцами одновременно.

Я надеюсь, что это проясняет, что в определенных ситуациях (например, в циклах) между withColumn() и select() может быть разница в производительности, и в целом лучше использовать последний. Подробнее о скрытой стоимости withColumn()можно прочитать здесь.

Наконец, переходим к первому вопросу: краткий ответ: нет, между этими двумя подходами нет (незначительной, если вообще имеется) разницы в производительности.

Возможно (поправьте меня, если я ошибаюсь), ваши сомнения возникают из-за того, что отказ от объединения операций с фреймами данных и их отдельное использование создают несколько промежуточных фреймов данных, которые могут вызывать накладные расходы, но то же самое происходит и во время цепочки. Следовательно, они имеют одинаковые физические планы и производительность с незначительными различиями, если таковые имеются.

Итак, единственное отличие, которое остается, — это читабельность.

Другие вопросы по теме