Мы используем кластер блоков данных, который отключается после 30 минут бездействия (13.3 LTS (включая Apache Spark 3.4.1, Scala 2.12)). Моя цель — прочитать таблицу красного смещения и записать ее в снежинку. Я использую следующий код:
df = spark.read \
.format("redshift") \
.option("url", jdbc_url) \
.option("user", user) \
.option("password", password) \
.option("dbtable", "schem_name.table_name") \
.option("partitionColumn", "date_col1")\
.option("lowerBound", "2023-11-05")\
.option("upperBound", "2024-03-23")\
.option("numPartitions", "500")\
.load()\
.filter("date_col1>dateadd(month ,-6,current_date)")\
.filter(col("country_col").isin('India', 'China', 'Japan', 'Germany', 'United Kingdom', 'Brazil', 'United States', 'Canada'))
df1 = df.repartition(900)#Data is skedwed for that partition column, so repartitioning to 1* num cores in cluster for even dist
df1.write.format("snowflake") \
.option("host", host_sfl) \
.option("user", user_sfl) \
.option('role', role_sfl) \
.option("password", password_sfl) \
.option("database", database_sfl) \
.option("sfWarehouse", warehouse_sfl) \
.option("schema",'schema_name')\
.option("dbtable",'target_table_name')\
.mode('Overwrite') \
.save()
It throws the following error, despite not having used query option in my code:
IllegalArgumentException: requirement failed:
Options 'query' and 'partitionColumn' can not be specified together.
Please define the query using `dbtable` option instead and make sure to qualify
the partition columns using the supplied subquery alias to resolve any ambiguity.
Example :
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
Когда я комментирую перераспределение, пишу в код снежинки и просто выполняю подсчет, он дает мне правильный подсчет.
Вот еще одно наблюдение: Если я изменю приведенный выше код на JDBC вместо красного смещения в .format("redshift") после подсчета, код будет работать.
Я не знаю, что здесь происходит. Задание продолжает завершаться сбоем при первом перезапуске кластера, и мне приходится сначала выполнить подсчет вручную и изменить его на JDBC, чтобы он работал. Пожалуйста, дайте мне знать, если я упускаю что-то очевидное. Я просмотрел много документации и не смог найти то, что мне нужно.
Вот что я в итоге сделал, и это сработало.
count_df = spark.read \
.format("com.databricks.spark.redshift") \
.option("dbtable", tbl1) \
.option("url", url) \
.option("user", user) \
.option("password", pwd) \
.load()\
.limit(1)
count_df.count()```
And then the code in the question started working, a dummy count action with a different driver parameter(com.databricks.spark.redshift) before running the code in the question.