Я новичок в использовании hudi, и у меня есть проблема. Я работаю с EMR в AWS с pyspark, Kafka, и я хочу прочитать тему из кластера Kafka с потоковой передачей pyspark, а затем перенести ее на S3 в формате hudi. Честно говоря, я много пробовал с тех пор, как несколько недель назад, и я не знаю, возможно ли это. Может кто-нибудь сказать мне помочь, пожалуйста? Код, с которым я работаю:
#Reading
df_T = spark.readStream \
.format("kafka") \
.options(**options_read) \
.option("subscribe", topic) \
.load()
....
hudi_options = {
'hoodie.table.name': MyTable,
'hoodie.datasource.write.table.name': MyTable,
'hoodie.datasource.write.recordkey.field': MyKeyInTable,
'hoodie.datasource.write.partitionpath.field': MyPartitionKey,
'hoodie.datasource.write.hive_style_partitioning': "true",
'hoodie.datasource.write.row.writer.enable': "false",
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.precombine.field': MyTimeStamp,
'hoodie.insert.shuffle.parallelism': 1,
'hoodie.consistency.check.enabled': "true",
'hoodie.cleaner.policy': "KEEP_LATEST_COMMITS",
'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
'hoodie.compact.inline': "false",
'hoodie.datasource.hive_sync.table': MyTable,
'hoodie.datasource.hive_sync.partition_fields': MyPartitionKey,
'hoodie.datasource.hive_sync.database' : Mydatabase,
'hoodie.datasource.hive_sync.auto_create_database': "true",
'hoodie.datasource.write.keygenerator.class': "org.apache.hudi.keygen.ComplexKeyGenerator",
'hoodie.datasource.hive_sync.partition_extractor_class': "org.apache.hudi.hive.MultiPartKeysValueExtractor",
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.skip_ro_suffix': 'true'
}
....
ds = df_T \
.writeStream \
.outputMode('append') \
.format("org.apache.hudi") \
.options(**hudi_options)\
.option('checkpointLocation', MyCheckpointLocation) \
.start(MyPathLocation) \
.awaitTermination(300)
....
Этот код в EMR говорит, что работает нормально, но когда я собираюсь искать файлы hudi, он их не создает. Я знаю, что конфигурация kafka работает, потому что когда в режиме вывода я устанавливаю «консоль», она работает нормально, может ли кто-нибудь мне помочь?






Привет, ребята, я мог бы исправить эту ошибку, прежде всего вам нужно очистить фреймворк данных, не все, но, по крайней мере, все поля, которые у вас есть в первичных ключах в таблицах, равны нулю. Во-вторых, в hoodie.datasource.write.precombine.field вы можете установить
...
import datetime
currentDate = datetime.datetime.now()
#As for example:
hudi_options = {
...
'hoodie.datasource.write.precombine.field': currentDate,
...
}
Наконец, если у вас нет временной метки в вашем фрейме данных, вы можете установить это:
.withColumn('Loaded_Date', F.lit(currentDate).cast('timestamp'))