Как сделать блокнот Scala databricks на Spark более быстрым и производительным

val df1= spark.read.format("delta").table("...100K_rows...")

val xform = udf ( (message: String) => {
 // abstract transformation, ideally this comes from a .jar library
 // such as: (abstract) https://github.com/cosmycx/scala_transformer
  val t0 = System.currentTimeMillis
  Thread.sleep(5)  
  System.currentTimeMillis - t0 
}) // .xformText 
spark.udf.register("xform", xform)

val df2= df1.withColumn("xformResult", xform($"SomeText"))

df2.write.format("delta")
    .mode(SaveMode.Overwrite)
    .saveAsTable("...")

Как это можно заставить работать быстрее?

Что я пробовал:

  • увеличить размер узла искрового кластера databricks: DS3_v2 14 ГБ 4 ядра по сравнению с DS5_v2 56 ГБ 16 ядер
  • увеличьте число работников искрового кластера databricks, драйвер плюс: 3, 5 и 10 (та же скорость!?)
  • изменение: spark.conf.set("spark.sql.shuffle.partitions", "auto") или разные значения

Результаты всегда находятся в этом диапазоне: 1 минута для 10 тысяч строк и 8 минут для 100 тысяч строк независимо от изменений.

Идеальные результаты были бы менее 1 минуты для 100K+. будет ли это вообще достижимо в искре databricks? Это работает в Azure, если это имеет значение.

Что я упускаю, что еще нужно учитывать, попробовать? Спасибо.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
64
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Во-первых, это, вероятно, не вариант использования искры. Набор данных настолько мал, что оптимизатор искры даже не знает, что с ним делать.

Что происходит, так это то, что файл небольшой и читается одной задачей, spark определяет преобразование как простое и применяет его при чтении данных, поэтому вы обрабатываете вещи последовательно и ждете всего 8 минут Thread.sleep()

Лучше всего, вероятно, не использовать искру для этого, но если вы попытаетесь заставить искру выполнить перетасовку (сортировку или что-то еще) и убедиться, что ваш код запускается после перетасовки, тогда у вас будет x (200 по умолчанию) разделов и может работать с разными задачами (но опять же - я бы не стал использовать для этого Spark)

Я думаю, что что-то вроде этого также является проблемой, искра выполняет работу последовательно и не распространяется на рабочие узлы. Я попытаюсь отсортировать полученный df1 в каком-то столбце, а затем применить преобразование... если я правильно понимаю. Что касается варианта использования, то 100 000 строк, скажем, каждые пару часов могут довольно быстро добавить до миллионов++ строк данных. Я предполагаю, что можно было бы выполнять потоковую передачу с чем-то еще быстрее, чем использовать искру только в хвостовой части только для кумулятивных данных...

cosmycx 02.04.2022 04:50

Тот факт, что вы используете UDF здесь, также важен, потому что UDF — самые дорогие функции в spark, потому что движок не может применить к ним оптимизацию.

intruderr 02.04.2022 10:05

Обычно рекомендуется иметь меньше больших файлов, но в вашем случае, если у вас есть несколько файлов меньшего размера, это позволит искре читать их одновременно.

Arnon Rotem-Gal-Oz 02.04.2022 10:14
Ответ принят как подходящий

Преобразование происходит последовательно, а не параллельно, потому что df1 DataFrame имеет только один раздел. Перераспределение исходного DataFrame и последующее преобразование нового секционированного DataFrame значительно повышает скорость и производительность как минимум в 10 раз.

println(df1.rdd.getNumPartitions) // 1

val df2 = df1.repartition(20) // run transform on df2 (100K rows in 35 sec.)

Другие вопросы по теме