Я понимаю, что при использовании Spark обычно следует избегать циклов for и while. Мой вопрос касается оптимизации цикла «пока», хотя, если мне не хватает решения, которое делает его ненужным, я все слушаю.
Я не уверен, что смогу продемонстрировать проблему (очень долгое время обработки, усугубляющееся по ходу цикла) с игрушечными данными, но вот некоторый псевдокод:
### I have a function - called 'enumerator' - which involves several joins and window functions.
# I run this function on my base dataset, df0, and return df1
df1 = enumerator(df0, param1 = apple, param2 = banana)
# Check for some condition in df1, then count number of rows in the result
counter = df1 \
.filter(col('X') == some_condition) \
.count()
# If there are rows meeting this condition, start a while loop
while counter > 0:
print('Starting with counter: ', str(counter))
# Run the enumerator function on df1 again
df2 = enumerator(df1, param1= apple, param2 = banana)
# Check for the condition again, then continue the while loop if necessary
counter = df2 \
.filter(col('X') == some_condition) \
.count()
df1 = df2
# After the while loop finishes, I take the last resulting dataframe and I will do several more operations and analyses downstream
final_df = df2
Существенным аспектом функции перечислителя является «оглядывание назад» на последовательность в окне, поэтому может потребоваться несколько прогонов, прежде чем будут сделаны все необходимые исправления.
В глубине души я знаю, что это уродливо, но оконный/ранжирующий/последовательный анализ внутри функции имеет решающее значение. Насколько я понимаю, базовый план запроса Spark становится все более и более запутанным по мере продолжения цикла. Есть ли какие-либо лучшие практики, которые я должен использовать в этой ситуации? Должен ли я кэшировать в любой момент - либо до запуска цикла while, либо внутри самого цикла?
Вы обязательно должны кэшировать/сохранять фреймы данных, иначе каждая итерация в цикле while
будет начинаться с нуля с df0
. Также вы можете отменить сохранение используемых фреймов данных, чтобы освободить место на диске/памяти.
Еще один момент оптимизации — не делать count
, а использовать более дешевую операцию, например df.take(1)
. Если это ничего не возвращает, то counter == 0
.
df1 = enumerator(df0, param1 = apple, param2 = banana)
df1.cache()
# Check for some condition in df1, then count number of rows in the result
counter = len(df1.filter(col('X') == some_condition).take(1))
while counter > 0:
print('Starting with counter: ', str(counter))
df2 = enumerator(df1, param1 = apple, param2 = banana)
df2.cache()
counter = len(df2.filter(col('X') == some_condition).take(1))
df1.unpersist() # unpersist df1 as it will be overwritten
df1 = df2
final_df = df2
Спасибо! использование '.take(1)' для этих сценариев, то, что я раньше не рассматривал, но кажется таким очевидным. Я подумал, что, возможно, планировщик запросов Spark в конечном итоге сделает то же самое под капотом, если я буду использовать условие типа «счетчик > 0» (остановится, как только достигнет 1)