Я создаю задание ETL в AWS Glue, которое будет извлекать из местоположения S3 самые последние изменения или текущие данные для каждого объекта в репозитории. Данные в репозитории представляют собой исторический отчет обо всех изменениях сущностей. Каждый день я запускаю ETL, и он записывается в другое место S3, т. е. Bucket/path/to/files/current_date/..., где текущая дата является динамической и соответствует дате запуска ETL.
Проблема, с которой я сталкиваюсь, заключается в том, что я не могу удалить программно из S3 (организационные ограничения) или переместить файлы, поскольку это копия и удаление за кулисами, поэтому это также не удается, оставляя единственный путь для обхода клея. Я хотел бы настроить сканер так, чтобы часть пути с датой была динамической, но я не смог найти способ сделать это - кто-нибудь знает, возможно ли это?
Мои данные разделены по run_date (см. текущую дату выше), а также по 6 другим иерархическим разделам. Я создаю сканеры и задания ETL через CloudFormation, язык yaml. Пути для сканеров хранятся в виде параметров ssm, определенных в сценариях CloudFormation.
Пример параметра Path SSM
S3CurrentPath:
Type: AWS::SSM::Parameter
Properties:
Description: "Path in the S3 Lake where the current entity data is stored."
Type: String
Value: 'Data/Entities/Software/SoftwareCurrent'
Name: "/org/member/local/s3/path/entityCurrent"
Код ресурса сканера:
GenericCrawler:
Type: AWS::Glue::Crawler
Properties:
Role: !Ref RoleNAme
Name: !Sub "${ProfileName}-crawler-${CrawlerName}"
Configuration: !Sub |
{
"Version": 1.0,
"CrawlerOutput": {
"Partitions": { "AddOrUpdateBehavior": "InheritFromTable" },
"Tables": { "AddOrUpdateBehavior": "MergeNewColumns" }
}
}
Description: !Ref CrawlerDescription
DatabaseName: !Ref DatabaseName
Targets:
S3Targets:
- Path: !Sub "s3://${S3DataBucket}/${S3Path}"
Код записи ETL DataSink:
# Write the joined dynamic frame out to a datasink
datasink = glueContext.write_dynamic_frame.from_options(
frame = final_dynamic_frame, connection_type = "s3",
connection_options = {
'path': 's3://{lakeBucketName}/{lakePath}/'.format(
lakeBucketName=args['lakeBucketName'],
lakePath=args['lakeDestinationPath']),
"partitionKeys": ['run_date','location','year','month','day','hour','timestamp']},
format = "parquet",
transformation_ctx = "datasink")
Я надеюсь, что сканер будет смотреть на самую последнюю дату в репозитории, то есть на самую последнюю «папку» раздела run_date, и сканировать ее, не просматривая более старые данные.
Пожалуйста, дайте мне знать, если вы хотите увидеть больше кода - я буду рад очистить и предоставить.





Честно говоря, я не нашел способа чтения/записи данных по динамическим путям с помощью AWS Glue. Что я обычно делаю, так это читаю/пишу с использованием методов PySpark:
datasink.write.\
format("com.databricks.spark.csv").\
option("header", "true").\
mode("overwrite").\
save("s3://my-bucket/files/" + current_date + "*.csv")
Вы даже можете указать методу только чтение/запись файлов определенного типа (например, .csv). PySpark имеет больше возможностей и доступных методов, чем AWS Glue, поэтому обеспечивает большую гибкость. Кроме того, я добавляю запись типа "ключ-значение" в таблицу DynamoDB, чтобы сохранить последнюю дату.
Stellar — отлично сработало, Аида! Теперь мне просто нужно посмотреть, смогу ли я создавать разделы с помощью метода datasink.write.
Спасибо, Аида - я попробую и посмотрю, подойдет ли это для моих нужд. Если это произойдет, я вернусь и выберу это в качестве ответа. Спасибо!