Кэширование и циклы в (Py)Spark

Я понимаю, что при использовании Spark обычно следует избегать циклов for и while. Мой вопрос касается оптимизации цикла «пока», хотя, если мне не хватает решения, которое делает его ненужным, я все слушаю.

Я не уверен, что смогу продемонстрировать проблему (очень долгое время обработки, усугубляющееся по ходу цикла) с игрушечными данными, но вот некоторый псевдокод:

### I have a function - called 'enumerator' - which involves several joins and window functions. 
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)

# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()

# If there are rows meeting this condition, start a while loop
while counter > 0:
  print('Starting with counter: ', str(counter))
  
  # Run the enumerator function on df1 again
  df2 = enumerator(df1, param1= apple, param2 = banana)
  
  # Check for the condition again, then continue the while loop if necessary
  counter = df2 \
  .filter(col('X') == some_condition) \
  .count()
  
  df1 = df2

# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream  
final_df = df2

Существенным аспектом функции перечислителя является «оглядывание назад» на последовательность в окне, поэтому может потребоваться несколько прогонов, прежде чем будут сделаны все необходимые исправления.

В глубине души я знаю, что это уродливо, но оконный/ранжирующий/последовательный анализ внутри функции имеет решающее значение. Насколько я понимаю, базовый план запроса Spark становится все более и более запутанным по мере продолжения цикла. Есть ли какие-либо лучшие практики, которые я должен использовать в этой ситуации? Должен ли я кэшировать в любой момент - либо до запуска цикла while, либо внутри самого цикла?

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
2 122
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы обязательно должны кэшировать/сохранять фреймы данных, иначе каждая итерация в цикле while будет начинаться с нуля с df0. Также вы можете отменить сохранение используемых фреймов данных, чтобы освободить место на диске/памяти.

Еще один момент оптимизации — не делать count, а использовать более дешевую операцию, например df.take(1). Если это ничего не возвращает, то counter == 0.

df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()

# Check for some condition in df1, then count number of rows in the result
counter = len(df1.filter(col('X') == some_condition).take(1))

while counter > 0:
  print('Starting with counter: ', str(counter))
  
  df2 = enumerator(df1, param1 = apple, param2 = banana)
  df2.cache()

  counter = len(df2.filter(col('X') == some_condition).take(1))
  df1.unpersist()    # unpersist df1 as it will be overwritten
  
  df1 = df2

final_df = df2

Спасибо! использование '.take(1)' для этих сценариев, то, что я раньше не рассматривал, но кажется таким очевидным. Я подумал, что, возможно, планировщик запросов Spark в конечном итоге сделает то же самое под капотом, если я буду использовать условие типа «счетчик > 0» (остановится, как только достигнет 1)

mcharl02 15.12.2020 22:38

Другие вопросы по теме