Как обработать большую дельта-таблицу с помощью UDF?

У меня есть дельта-таблица с примерно 300 миллиардами строк. Теперь я выполняю некоторые операции со столбцом, используя UDF, и создаю другой столбец.

Мой код примерно такой

def my_udf(data):
    return pass
       
 
udf_func = udf(my_udf, StringType())
data = spark.sql("""SELECT * FROM large_table """)
data = data.withColumn('new_column', udf_func(data.value))

Проблема сейчас в том, что это займет много времени, так как Spark обработает все 300 миллиардов строк, а затем запишет вывод. Есть ли способ, с помощью которого мы можем выполнять пакетную обработку Mirco и регулярно записывать их вывод в выходную дельта-таблицу?

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
48
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Первое правило обычно состоит в том, чтобы максимально избегать UDF — какое преобразование вам нужно выполнить, чего нет в самой Spark?

Второе правило - если вы не можете избежать использования UDF, по крайней мере, используйте Пользовательские функции Pandas, которые обрабатывают данные в пакетах и ​​не имеют таких больших накладных расходов на сериализацию/десериализацию - обычные UDF обрабатывают данные построчно, кодируя и декодируя данные для каждого из их.

Если ваша таблица создавалась с течением времени и состоит из множества файлов, вы можете попробовать использовать Spark Structured Streaming с Trigger.AvailableNow (требуется DBR 10.3 или 10.4), примерно так:

maxNumFiles = 10 # max number of parquet files processed at once
df = spark.readStream \
  .option("maxFilesPerTrigger", maxNumFiles) \ 
  .table("large_table")
df = df.withColumn('new_column', udf_func(data.value))
df.writeStream \
  .option("checkpointLocation", "/some/path") \
  .trigger(availableNow=True) \
  .toTable("my_destination_table")

это будет читать исходную таблицу по частям, применять ваше преобразование и записывать данные в целевую таблицу.

Я декодирую двоичный файл Thrift в JSON, поэтому создал для него UDF.

John Constantine 25.03.2022 05:10

Используйте хотя бы PandasUDF — это значительно быстрее

Alex Ott 25.03.2022 08:42

Если я использую Pandas UDF, это обрабатывается в пакетном режиме. Но поскольку мы читаем таблицу в потоке, не будет ли сведено на нет преимущество pandas udf?

John Constantine 28.03.2022 14:26

Нет. UDF Pandas работают фрагментами по 10 000 записей (по умолчанию), в этом случае нет различий между пакетом и потоком, поскольку потоковая передача в любом случае является микропакетом.

Alex Ott 28.03.2022 14:33

Но внутри моей Pandas UDF мне все еще нужно сделать map, так как мне нужно получить доступ к каждой строке по отдельности, имеет ли смысл использовать Pandas UDF?

John Constantine 28.03.2022 14:38

Все дело в массовой сериализации и десериализации, а не построчно.

Alex Ott 28.03.2022 15:25

Другие вопросы по теме