У меня есть задание pyspark, работающее на Glue. Моя работа обрабатывает данные и сохраняет их как Apache Iceberg. Проблема в том, что таблица сохранения генерирует несколько небольших файлов внутри разделов. Я протестировал несколько способов сохранения данных, но ни один из них не помог. Ниже следует мой вырезанный код.
import pyspark.sql.functions as f
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import DataFrame
conf = (
SparkConf()
.setAppName(APP_NAME)
.set(
"spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
)
.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
.set("spark.sql.catalog.glue_catalog.warehouse", BRONZE_PATH)
.set(
"spark.sql.catalog.glue_catalog.catalog-impl",
"org.apache.iceberg.aws.glue.GlueCatalog",
)
.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
.set("spark.sql.shuffle.partitions", "100")
)
sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
glue_db = glueContext.create_dynamic_frame.from_catalog(database=DATABASE_NAME, table_name=LANDING_TABLE_NAME)
df = glue_db.toDF()
df.createOrReplaceTempView(APP_NAME)
# do all processing here...
df = df.sortWithinPartitions("issueid")
(
df.writeTo(f"glue_catalog.bronze_{ENVIRONMENT}.{BRONZE_TABLE_NAME}")
.using("iceberg")
.tableProperty("format-version", "2")
.tableProperty("location", BRONZE_PATH + BRONZE_TABLE_OUTPUT)
.tableProperty("write.distribution.mode", "hash")
.tableProperty("write.target-file-size-bytes", "536870912")
.partitionedBy("issueid")
.createOrReplace()
)
Мой вывод показан ниже:
Желаемый результат: один сжатый файл на раздел.
Как я могу этого добиться?
У меня сработало перераспределение:
df = df.repartition("issueid").sortWithinPartitions("issueid")
Не уверен, что это лучшее решение. Пожалуйста, не стесняйтесь улучшать его.