Я новичок в PySpark. Я понимаю, что он использует отложенную оценку, а это означает, что выполнение серии преобразований будет отложено до тех пор, пока не будет запрошено какое-либо действие, после чего движок Spark оптимизирует весь набор преобразований, а затем выполнит их.
Поэтому как с функциональной точки зрения, так и с точки зрения производительности я ожидаю следующего:
Подход А
df = spark.read.parquet(path)
df = df.filter(F.col('state') == 'CA')
df = df.select('id', 'name', 'subject')
df = df.groupBy('subject').count()
быть таким же, как это:
Подход Б
df = spark.read.parquet(path)\
.filter(F.col('state') == 'CA')\
.select('id', 'name', 'subject')\
.groupBy('subject').count()
Мне нравится стиль подхода А, потому что он позволяет мне разбить мою логику на более мелкие (и повторно используемые) функции.
Однако я наткнулся на несколько сообщений в блоге (например, здесь , здесь и здесь), которые меня смутили. Эти сообщения посвящены использованию последовательных операторов withColumn
и предлагают вместо этого использовать одиночный select
. Основная причина, по-видимому, заключается в том, что, поскольку фреймы данных неизменяемы, последовательное использование withColumn
вредно для производительности.
Итак, мой вопрос: возникнет ли у меня такая же проблема с производительностью при использовании подхода А? Или это просто проблема, специфичная для withColumn
?
Строго говоря, я ответил на вопрос. На самом деле у вас есть два вопроса под разными углами зрения.
@thebluephantom извини, я не понял: «На самом деле у тебя есть два вопроса, которые под разными углами зрения». Что вы подразумеваете под «расходящимися углами»? Могу ли я предоставить дополнительную информацию, которая поможет прояснить вопрос?
Ничего страшного. Я подробнее рассмотрю этот вопрос позже.
Добавил в ответ. Второй ответ на самом деле,
Подход А:
Подход Б:
Вместо использования последовательных операторов withColumn вы можете использовать один оператор выбора для достижения того же результата.
Он выполняет преобразования за один проход через данные.
Каждая операция в цепочке применяется непосредственно к DataFrame
возвращается предыдущей операцией без создания промежуточного
DataFrames.
Подход B обычно предпочтительнее из-за его большей производительности и более чистого и лаконичного кода.
Производительность равная.
Собственно есть 2 вопроса.
Первый вопрос с образцами кодов. В отличие от первого ответа, я не согласен.
Попробуйте оба сегмента кода со знаком .explain()
, и вы увидите, что сгенерированный физический план выполнения абсолютно одинаков.
Spark основан на lazy evaluation
. То есть:
Все преобразования в Spark являются ленивыми, поскольку они не вычисляют их результаты сразу. Вместо этого они просто помнят преобразования, примененные к некоторому базовому набору данных (например, файлу). преобразования вычисляются только тогда, когда действие требует результата для вернуться в программу драйвера. Эта конструкция позволяет Spark работать более эффективно. Например, мы можем понять, что набор данных, созданный Through Map будет использоваться при сокращении и возвращать только результат сократить до драйвера, а не до большего сопоставленного набора данных.
В результате всего этого я запустил код, аналогичный вашему, с двумя примененными фильтрами, и обратите внимание, что, поскольку действие .count
вызывает своевременную оценку, Catalyst отфильтровывает данные на основе как первого, так и второго фильтра. Это известно как "Code Fusing"
— что можно сделать для позднего выполнения, т. е. ленивой оценки.
Фрагмент 1 и физический план
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.filter(col('lastname') == 'Jones2')
df = df.groupBy('lastname').count().explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#212], functions=[finalmerge_count(merge count#233L) AS count(1)#228L])
+- Exchange hashpartitioning(lastname#212, 200), ENSURE_REQUIREMENTS, [plan_id=391]
+- HashAggregate(keys=[lastname#212], functions=[partial_count(1) AS count#233L])
+- Project [lastname#212]
+- Filter (isnotnull(lastname#212) AND ((lastname#212 = Jones) AND (lastname#212 = Jones2)))
+- Scan ExistingRDD[firstname#210,middlename#211,lastname#212,id#213,gender#214,salary#215]
Фрагмент 2 и тот же физический план
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import col
data2 = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema2 = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df2 = spark.createDataFrame(data=data2,schema=schema2)
df2 = df2.filter(col('lastname') == 'Jones')\
.select('firstname', 'lastname', 'salary')\
.filter(col('lastname') == 'Jones2')\
.groupBy('lastname').count().explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[lastname#299], functions=[finalmerge_count(merge count#320L) AS count(1)#315L])
+- Exchange hashpartitioning(lastname#299, 200), ENSURE_REQUIREMENTS, [plan_id=577]
+- HashAggregate(keys=[lastname#299], functions=[partial_count(1) AS count#320L])
+- Project [lastname#299]
+- Filter (isnotnull(lastname#299) AND ((lastname#299 = Jones) AND (lastname#299 = Jones2)))
+- Scan ExistingRDD[firstname#297,middlename#298,lastname#299,id#300,gender#301,salary#302]
Второй вопрос - withColumn
Делая это:
df = df.filter(col('lastname') == 'Jones')
df = df.select('firstname', 'lastname', 'salary')
df = df.withColumn("salary100",col("salary")*100)
df = df.withColumn("salary200",col("salary")*200).explain()
или через цепочку, дает тот же результат. Т.е. не имеет значения, как вы напишете преобразования. Окончательный физический план – это то, что имеет значение, но эта оптимизация требует дополнительных затрат. Зависит от того, как вы об этом думаете, select
— это альтернатива.
== Physical Plan ==
*(1) Project [firstname#399, lastname#401, salary#404, (salary#404 * 100) AS salary100#414, (salary#404 * 200) AS salary200#419]
+- *(1) Filter (isnotnull(lastname#401) AND (lastname#401 = Jones))
+- *(1) Scan ExistingRDD[firstname#399,middlename#400,lastname#401,id#402,gender#403,salary#404]
Эта статья https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015#:~:text=Summary,number%20of%20transforms%20on%20DataFrame%20 занимает другой взгляд на стоимость оптимизации в отличие от проблемы «прогноза». Я был удивлен, что 3 withColumns вызовет какие-либо проблемы. Если это проблема, то это указывает на дерьмовую оптимизацию. И я подозреваю, что реальная проблема заключается в зацикливании многих функций, а не небольшого количества withColumns.
спасибо за объяснение и дополнительную информацию. Ваш ответ - это то, чего я ожидал, исходя из моего понимания того, что такое ленивая оценка. А как насчет конкретного случая использования withColumn
? Действительно ли последовательные операторы withColumn
имеют худшую производительность, чем один оператор select
?
Спасибо за дополнительную информацию по поводу. withColumn
. Итак, похоже, что tl;dr: 1) цепочка или последовательное переназначение не имеют значения; 2) для преобразований в нескольких столбцах выполнение их по отдельности с использованием withColumn
влечет за собой небольшие дополнительные накладные расходы на оптимизацию, хотя вам придется сделать много таких преобразований, чтобы это имело существенный эффект; 3) два понятия №1 и №2 ортогональны (я думаю, именно это вы имели в виду в своем комментарии о «расходящихся углах», теперь я понимаю).
@teejay, ты понял. Spark также предстоит внести некоторые улучшения. объяснения трудно читать, как и вывод пользовательского интерфейса. Успех
Я предполагаю, что у вас есть следующие вопросы:
withColumn()
и одним оператором select()
?Теперь ответить на второй вопрос довольно просто. Как отмечено в справочнике по искровому API для withColumn():
Этот метод вводит внутреннюю проекцию. Следовательно, вызов его несколько раз, например, через циклы для добавления нескольких столбцов, может привести к созданию больших планов, которые могут вызвать проблемы с производительностью и даже исключение StackOverflowException. Чтобы избежать этого, используйте select() с несколькими столбцами одновременно.
Я надеюсь, что это проясняет, что в определенных ситуациях (например, в циклах) между withColumn()
и select()
может быть разница в производительности, и в целом лучше использовать последний. Подробнее о скрытой стоимости withColumn()
можно прочитать здесь.
Наконец, переходим к первому вопросу: краткий ответ: нет, между этими двумя подходами нет (незначительной, если вообще имеется) разницы в производительности.
Возможно (поправьте меня, если я ошибаюсь), ваши сомнения возникают из-за того, что отказ от объединения операций с фреймами данных и их отдельное использование создают несколько промежуточных фреймов данных, которые могут вызывать накладные расходы, но то же самое происходит и во время цепочки. Следовательно, они имеют одинаковые физические планы и производительность с незначительными различиями, если таковые имеются.
Итак, единственное отличие, которое остается, — это читабельность.
Обновленный ответ, то же самое. Попробуй это.