У меня есть несколько DataFramess, и, наконец, я пишу эти DF в дельта-таблицах.
Есть 5 фреймов данных, которые мне нужно записать в 5 дельта-таблиц параллельно. Можем ли мы сделать это в одной тетради?
Я пишу вывод следующим образом:
query_a_b = metadata_df1.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df1")
query_a_c = state_df.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint1/event_hub_df") \
.outputMode("append") \
.start("Tables/state_df")
query_a_d = cols.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint2/event_hub_df") \
.outputMode("append") \
.start("Tables/cols")
query_a_e = metadata_df2.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint3/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df2")
query_a_f = metadata_df3.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint4/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df3")
Аналогично, у меня есть 30 DataFrames, которые нужно написать параллельно с 30 дельта-таблицами.
Это будет писаться параллельно?
Вы можете запустить любое количество запросов в одном SparkSession. Все они будут работать одновременно, совместно используя ресурсы кластера.
Вы можете использовать sparkSession.streams()
, чтобы получить StreamingQueryManager (документация Scala / Java / Python), который можно использовать для управления текущими активными запросами.
spark = ... # spark session
spark.streams.active # get the list of currently active streaming queries
spark.streams.get(id) # get a query object by its unique id
spark.streams.awaitAnyTermination() # block until any one of them terminates
Вы можете использовать аналогичный подход, если у вас есть 30 DataFrames, которые вам нужно параллельно записать в 30 дельта-таблиц. Просто создайте 30 отдельных запросов writeStream
, по одному для каждого DataFrame.