Как оцениваются преобразования pyspark, заключенные в один метод?

Я пытаюсь организовать пару преобразований данных, которые выполняются в pyspark. У меня есть код, подобный приведенному ниже.

def main():
    spark_session = SparkSession\
        .builder\
        .appName(config.SPARK_CONFIG['AppName']) \
        .getOrCreate()
    data = getData(spark_session)

    analytics = Analytics(data)
    analytics.execute_and_save_analytics()

    spark_session.stop()


def getData(spark_session):    
    sqlContext = pyspark.SQLContext(spark_session.sparkContext)
    return sqlContext.read.option('user', user).option('password', pswd)\
        .jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
        + ';database=' + database, table)


class Analytics():
    def __init__(self, df):
        self.df = df

    def _execute(self):
        df0 = self.df.withColumn('col3', df.col31 + df.col32)
        # df0.persist()
        df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
        df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
        return df1, df2

    def execute_and_save_analytics(self):
        output_df1, output_df2 = self._execute()
        output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
        output_df2.coalesce(1).write.csv('/path/file.csv', header='true')

Как я могу реорганизовать код таким образом, чтобы df0 оценивался только один раз? Я попытался использовать persist (), как в закомментированной строке, но улучшения производительности не было. Есть предположения?

Другая, но похожая проблема: как бы вы организовали свои конвейеры, если бы у вас был не один _execute (), а много похожих методов _execute1 (), _execute2 () и т. д. Я полагаю, что если я вызываю методы _execute () отдельно, тогда PySpark будет оценивать каждый конвейер преобразований отдельно (?), Поэтому я теряю производительность.

edit: Данные преобразования (filter, groupBy, count) являются только примерами, я ищу решение, работающее с любым типом преобразований или определением col3.

edit2: Кажется, что вызов cache () в init - лучшая оптимизация здесь.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
50
1

Ответы 1

Как бы то ни было (с закомментированным persist) df0 в любом случае будет оцениваться дважды. Структура вашего кода никак не повлияет.

Разделение вашего кода на

def _execute_1(self):
    df0 = self.df.withColumn('col3', df.col31 + df.col32)
    df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
    return df1

def _execute_2(self):
    df0 = self.df.withColumn('col3', df.col31 + df.col32)
    df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
    return df2

не будет иметь никакого значения. Не вдаваясь в подробности о гарантиях cache, вы можете:

def __init__(self, df):
    self.df = df.withColumn('col3', df.col31 + df.col32).cache()

def _execute_1(self):
    return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()

def _execute_2(self):
    return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()

def execute_and_save_analytics(self):
    output_df1 = self._execute_1()
    output_df2 = self._execute_2()
    output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
    output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
    self.df.unpersist()

но было бы проще:

(self.df
  .withColumn('col3', df.col31 + df.col32 > 10)
  .repartition("col3")
  .write.partitionBy("col3")
  .write.csv('/path/file.csv', header='true'))

Спасибо за ответ. Начиная с конца: данные преобразования и определение col3 являются лишь примерами, я ищу метод, который работал бы с любым типом преобразований и любым количеством различных методов _execute (). Отредактирую свой вопрос, чтобы уточнить. Я ожидаю, что cache () (или persist ()) в в этом должен выполнить свою работу, я проверю это. Однако я бы предпочел ничего не кэшировать, пока не будет вызван конкретный _execute ().

Daniel R 02.05.2018 22:08

Таблицы кэшируются лениво. Если вы никогда не выполните действие, кеширование никогда не произойдет.

Alper t. Turker 02.05.2018 23:13

Большой! не знал, тогда воспользуюсь :)

Daniel R 03.05.2018 15:34

Другие вопросы по теме