val df1= spark.read.format("delta").table("...100K_rows...")
val xform = udf ( (message: String) => {
// abstract transformation, ideally this comes from a .jar library
// such as: (abstract) https://github.com/cosmycx/scala_transformer
val t0 = System.currentTimeMillis
Thread.sleep(5)
System.currentTimeMillis - t0
}) // .xformText
spark.udf.register("xform", xform)
val df2= df1.withColumn("xformResult", xform($"SomeText"))
df2.write.format("delta")
.mode(SaveMode.Overwrite)
.saveAsTable("...")
Как это можно заставить работать быстрее?
Что я пробовал:
Результаты всегда находятся в этом диапазоне: 1 минута для 10 тысяч строк и 8 минут для 100 тысяч строк независимо от изменений.
Идеальные результаты были бы менее 1 минуты для 100K+. будет ли это вообще достижимо в искре databricks? Это работает в Azure, если это имеет значение.
Что я упускаю, что еще нужно учитывать, попробовать? Спасибо.
Во-первых, это, вероятно, не вариант использования искры. Набор данных настолько мал, что оптимизатор искры даже не знает, что с ним делать.
Что происходит, так это то, что файл небольшой и читается одной задачей, spark определяет преобразование как простое и применяет его при чтении данных, поэтому вы обрабатываете вещи последовательно и ждете всего 8 минут Thread.sleep()
Лучше всего, вероятно, не использовать искру для этого, но если вы попытаетесь заставить искру выполнить перетасовку (сортировку или что-то еще) и убедиться, что ваш код запускается после перетасовки, тогда у вас будет x (200 по умолчанию) разделов и может работать с разными задачами (но опять же - я бы не стал использовать для этого Spark)
Тот факт, что вы используете UDF здесь, также важен, потому что UDF — самые дорогие функции в spark, потому что движок не может применить к ним оптимизацию.
Обычно рекомендуется иметь меньше больших файлов, но в вашем случае, если у вас есть несколько файлов меньшего размера, это позволит искре читать их одновременно.
Преобразование происходит последовательно, а не параллельно, потому что df1 DataFrame имеет только один раздел. Перераспределение исходного DataFrame и последующее преобразование нового секционированного DataFrame значительно повышает скорость и производительность как минимум в 10 раз.
println(df1.rdd.getNumPartitions) // 1
val df2 = df1.repartition(20) // run transform on df2 (100K rows in 35 sec.)
Я думаю, что что-то вроде этого также является проблемой, искра выполняет работу последовательно и не распространяется на рабочие узлы. Я попытаюсь отсортировать полученный df1 в каком-то столбце, а затем применить преобразование... если я правильно понимаю. Что касается варианта использования, то 100 000 строк, скажем, каждые пару часов могут довольно быстро добавить до миллионов++ строк данных. Я предполагаю, что можно было бы выполнять потоковую передачу с чем-то еще быстрее, чем использовать искру только в хвостовой части только для кумулятивных данных...