У меня есть дельта-таблица с примерно 300 миллиардами строк. Теперь я выполняю некоторые операции со столбцом, используя UDF, и создаю другой столбец.
Мой код примерно такой
def my_udf(data):
return pass
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))
Проблема сейчас в том, что это займет много времени, так как Spark обработает все 300 миллиардов строк, а затем запишет вывод. Есть ли способ, с помощью которого мы можем выполнять пакетную обработку Mirco и регулярно записывать их вывод в выходную дельта-таблицу?
Первое правило обычно состоит в том, чтобы максимально избегать UDF — какое преобразование вам нужно выполнить, чего нет в самой Spark?
Второе правило - если вы не можете избежать использования UDF, по крайней мере, используйте Пользовательские функции Pandas, которые обрабатывают данные в пакетах и не имеют таких больших накладных расходов на сериализацию/десериализацию - обычные UDF обрабатывают данные построчно, кодируя и декодируя данные для каждого из их.
Если ваша таблица создавалась с течением времени и состоит из множества файлов, вы можете попробовать использовать Spark Structured Streaming с Trigger.AvailableNow
(требуется DBR 10.3 или 10.4), примерно так:
maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
.option("maxFilesPerTrigger", maxNumFiles) \
.table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
.option("checkpointLocation", "/some/path") \
.trigger(availableNow=True) \
.toTable("my_destination_table")
это будет читать исходную таблицу по частям, применять ваше преобразование и записывать данные в целевую таблицу.
Используйте хотя бы PandasUDF — это значительно быстрее
Если я использую Pandas UDF, это обрабатывается в пакетном режиме. Но поскольку мы читаем таблицу в потоке, не будет ли сведено на нет преимущество pandas udf?
Нет. UDF Pandas работают фрагментами по 10 000 записей (по умолчанию), в этом случае нет различий между пакетом и потоком, поскольку потоковая передача в любом случае является микропакетом.
Но внутри моей Pandas UDF мне все еще нужно сделать map
, так как мне нужно получить доступ к каждой строке по отдельности, имеет ли смысл использовать Pandas UDF?
Все дело в массовой сериализации и десериализации, а не построчно.
Я декодирую двоичный файл Thrift в JSON, поэтому создал для него UDF.