Извлечение данных из хранилища BLOB-объектов в Databricks[автоматизация]

У меня есть данные больших двоичных объектов в разных папках по году, месяцу и дате (вложенная папка), которые обновляются ежедневно. Мне нужно спроектировать конвейер, который будет эффективно загружать исторические данные из больших двоичных объектов в блоки данных Azure. Не могли бы вы рассказать, как правильно хранить ежедневные и исторические данные в блоках данных?

Последующие шаги: Создана точка монтирования с участником хранилища BLOB-объектов и возможность доступа к образцам данных, например к одному файлу паркета 2024/18/7/table_name.parquet.

Каким должен быть способ ежедневной загрузки с автоматизацией вместе с историческими данными. Спасибо

Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
0
61
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Здесь вам необходимо учитывать две вещи при копировании данных из учетной записи хранения в блоки данных.

  1. Копирование всех файлов в один файл в блоках данных, т. е. исходные файлы должны быть объединены в один целевой файл в блоках данных.

    Здесь прежде всего вам нужно скопировать все старые файлы дат в файл блоков данных. Затем каждый день объединяйте файлы ежедневных дат с одним и тем же файлом блоков данных.

    Код для копирования всех файлов дат в целевой файл.

    df1=spark.read.parquet('/mnt/mymount/2024/*/*/*.parquet',inferSchema=True)
    df1.write.parquet('/dbfs/FileStore/tables/target.parquet')
    dbutils.fs.ls('/dbfs/FileStore/tables')
    

    Теперь возьмите другой блокнот и используйте приведенный ниже код, который загружает файлы ежедневных дат в одно и то же место.

    from datetime import datetime
    temp_date=datetime.today().strftime('%Y/%d/%m')
    mydate=temp_date[0:temp_date.rfind('/')]+'/'+str(int(temp_date.split('/')[-1]))
    print(mydate)
    
    mydf=spark.read.parquet('/dbfs/FileStore/tables/target.parquet',inferSchema=True)
    files_path='/mnt/mymount/'+mydate+'/*.parquet'
    mydf.union(spark.read.parquet(files_path,inferSchema=True))
    mydf.display()
    mydf.write.mode("overwrite").parquet('/dbfs/FileStore/tables/target.parquet')
    

    Вам нужно запланировать этот блокнот каждый день, чтобы он копировал ежедневные данные.

  2. Копирование всех файлов в блоки данных как отдельных файлов с одинаковой структурой папок файлов.

    Используйте приведенный ниже код, чтобы сначала скопировать файлы в цель.

    import glob, os
    list_of_paths=[]
    for file in glob.iglob('/dbfs/mnt/mymount/2024/**/*.parquet',recursive=True):
        list_of_paths.append(file.replace('/dbfs',''))
    print(list_of_paths)
    
    for i in list_of_paths:
        spark.read.parquet(i,inferSchema=True).write.parquet('/dbfs/FileStore/tables/'+i.replace('/mnt/mymount/',''))
    

    Этот код рекурсивно получает все пути к файлам и копирует их в целевое расположение, создавая ту же структуру, что и исходный.

    А для ежедневного использования используйте приведенный ниже код в новом блокноте.

    files_today_path='/mnt/mymount/'+mydate
    list_today_paths=[]
    for i in dbutils.fs.ls(files_today_path):
        list_today_paths.append(i.path)
    print(list_today_paths)
    for i in list_of_paths:
        spark.read.parquet(i,inferSchema=True).write.mode("overwrite").parquet('/dbfs/FileStore/tables/'+i.replace('dbfs:/mnt/mymount/',''))
    

    Получите текущий формат даты папки, как указано выше, и используйте его. Запланируйте этот блокнот каждый день.

Спасибо Ракеш за подробные усилия. Я попробую.

Saswat Ray 19.07.2024 13:03

Другие вопросы по теме