У меня возникла проблема с записью более 1 файла на раздел в Iceberg.
Вот моя команда записи:
df.repartition(
partitions, col("exposure_id"), col("event_date"), col("advertising_id"))
.sortWithinPartitions(col("advertising_id"), col("timestamp"))
.writeTo(fullTableName)
.append()
Это работает, если мои данные имеют только один раздел для записи, но если мои данные содержат несколько разделов, происходит сбой с этой ошибкой:
Caused by: java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'exposure_id=10/event_date=2024-06-28' in spec [
1000: exposure_id: identity(13)
1001: event_date: identity(14)
]
Удаление рекламного_id из оператора перераспределения заставляет его работать, но производительность ужасна, поскольку в каждом из моих разделов содержится много данных, которые в конечном итоге попадают в один исполнитель для каждого раздела.
Как я могу записать несколько файлов в айсберг на один раздел, чтобы он не сошел с ума?
Для записи более чем в один раздел на файл необходимо включить разветвление во время создания таблицы.
df.writeTo(table)
.tableProperty("location", location)
.tableProperty("write.spark.fanout.enabled", "true")
.partitionedBy(cols)
.using("iceberg")
.create()