Я пытаюсь организовать пару преобразований данных, которые выполняются в pyspark. У меня есть код, подобный приведенному ниже.
def main():
spark_session = SparkSession\
.builder\
.appName(config.SPARK_CONFIG['AppName']) \
.getOrCreate()
data = getData(spark_session)
analytics = Analytics(data)
analytics.execute_and_save_analytics()
spark_session.stop()
def getData(spark_session):
sqlContext = pyspark.SQLContext(spark_session.sparkContext)
return sqlContext.read.option('user', user).option('password', pswd)\
.jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
+ ';database=' + database, table)
class Analytics():
def __init__(self, df):
self.df = df
def _execute(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
# df0.persist()
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df1, df2
def execute_and_save_analytics(self):
output_df1, output_df2 = self._execute()
output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file.csv', header='true')
Как я могу реорганизовать код таким образом, чтобы df0 оценивался только один раз? Я попытался использовать persist (), как в закомментированной строке, но улучшения производительности не было. Есть предположения?
Другая, но похожая проблема: как бы вы организовали свои конвейеры, если бы у вас был не один _execute (), а много похожих методов _execute1 (), _execute2 () и т. д. Я полагаю, что если я вызываю методы _execute () отдельно, тогда PySpark будет оценивать каждый конвейер преобразований отдельно (?), Поэтому я теряю производительность.
edit: Данные преобразования (filter, groupBy, count) являются только примерами, я ищу решение, работающее с любым типом преобразований или определением col3.
edit2: Кажется, что вызов cache () в init - лучшая оптимизация здесь.
Как бы то ни было (с закомментированным persist
) df0
в любом случае будет оцениваться дважды. Структура вашего кода никак не повлияет.
Разделение вашего кода на
def _execute_1(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
return df1
def _execute_2(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df2
не будет иметь никакого значения. Не вдаваясь в подробности о гарантиях cache
, вы можете:
def __init__(self, df):
self.df = df.withColumn('col3', df.col31 + df.col32).cache()
def _execute_1(self):
return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
def _execute_2(self):
return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
def execute_and_save_analytics(self):
output_df1 = self._execute_1()
output_df2 = self._execute_2()
output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
self.df.unpersist()
но было бы проще:
(self.df
.withColumn('col3', df.col31 + df.col32 > 10)
.repartition("col3")
.write.partitionBy("col3")
.write.csv('/path/file.csv', header='true'))
Таблицы кэшируются лениво. Если вы никогда не выполните действие, кеширование никогда не произойдет.
Большой! не знал, тогда воспользуюсь :)
Спасибо за ответ. Начиная с конца: данные преобразования и определение col3 являются лишь примерами, я ищу метод, который работал бы с любым типом преобразований и любым количеством различных методов _execute (). Отредактирую свой вопрос, чтобы уточнить. Я ожидаю, что cache () (или persist ()) в в этом должен выполнить свою работу, я проверю это. Однако я бы предпочел ничего не кэшировать, пока не будет вызван конкретный _execute ().