У меня есть данные больших двоичных объектов в разных папках по году, месяцу и дате (вложенная папка), которые обновляются ежедневно. Мне нужно спроектировать конвейер, который будет эффективно загружать исторические данные из больших двоичных объектов в блоки данных Azure. Не могли бы вы рассказать, как правильно хранить ежедневные и исторические данные в блоках данных?
Последующие шаги: Создана точка монтирования с участником хранилища BLOB-объектов и возможность доступа к образцам данных, например к одному файлу паркета 2024/18/7/table_name.parquet.
Каким должен быть способ ежедневной загрузки с автоматизацией вместе с историческими данными. Спасибо
Здесь вам необходимо учитывать две вещи при копировании данных из учетной записи хранения в блоки данных.
Копирование всех файлов в один файл в блоках данных, т. е. исходные файлы должны быть объединены в один целевой файл в блоках данных.
Здесь прежде всего вам нужно скопировать все старые файлы дат в файл блоков данных. Затем каждый день объединяйте файлы ежедневных дат с одним и тем же файлом блоков данных.
Код для копирования всех файлов дат в целевой файл.
df1=spark.read.parquet('/mnt/mymount/2024/*/*/*.parquet',inferSchema=True)
df1.write.parquet('/dbfs/FileStore/tables/target.parquet')
dbutils.fs.ls('/dbfs/FileStore/tables')
Теперь возьмите другой блокнот и используйте приведенный ниже код, который загружает файлы ежедневных дат в одно и то же место.
from datetime import datetime
temp_date=datetime.today().strftime('%Y/%d/%m')
mydate=temp_date[0:temp_date.rfind('/')]+'/'+str(int(temp_date.split('/')[-1]))
print(mydate)
mydf=spark.read.parquet('/dbfs/FileStore/tables/target.parquet',inferSchema=True)
files_path='/mnt/mymount/'+mydate+'/*.parquet'
mydf.union(spark.read.parquet(files_path,inferSchema=True))
mydf.display()
mydf.write.mode("overwrite").parquet('/dbfs/FileStore/tables/target.parquet')
Вам нужно запланировать этот блокнот каждый день, чтобы он копировал ежедневные данные.
Копирование всех файлов в блоки данных как отдельных файлов с одинаковой структурой папок файлов.
Используйте приведенный ниже код, чтобы сначала скопировать файлы в цель.
import glob, os
list_of_paths=[]
for file in glob.iglob('/dbfs/mnt/mymount/2024/**/*.parquet',recursive=True):
list_of_paths.append(file.replace('/dbfs',''))
print(list_of_paths)
for i in list_of_paths:
spark.read.parquet(i,inferSchema=True).write.parquet('/dbfs/FileStore/tables/'+i.replace('/mnt/mymount/',''))
Этот код рекурсивно получает все пути к файлам и копирует их в целевое расположение, создавая ту же структуру, что и исходный.
А для ежедневного использования используйте приведенный ниже код в новом блокноте.
files_today_path='/mnt/mymount/'+mydate
list_today_paths=[]
for i in dbutils.fs.ls(files_today_path):
list_today_paths.append(i.path)
print(list_today_paths)
for i in list_of_paths:
spark.read.parquet(i,inferSchema=True).write.mode("overwrite").parquet('/dbfs/FileStore/tables/'+i.replace('dbfs:/mnt/mymount/',''))
Получите текущий формат даты папки, как указано выше, и используйте его. Запланируйте этот блокнот каждый день.
Спасибо Ракеш за подробные усилия. Я попробую.