Я работаю над приложением PySpark, работающим на Amazon EMR, где моя задача включает загрузку файлов на основе URL-адреса из DataFrame. Цель состоит в том, чтобы непрерывно загружать эти файлы на исполнитель EMR до тех пор, пока не будет достигнут указанный предел размера файла. Как только этот предел будет достигнут, я собираюсь объединить эти файлы в tar-архив. Однако я столкнулся с проблемой правильного соблюдения ограничения размера файла перед началом процесса создания tar-файла. Я попытался использовать функцию mapPartitions
с целью пакетной обработки нескольких файлов до тех пор, пока они вместе не достигнут ограничения на размер файла, после чего их следует заархивировать в один tar-файл.
Процесс должен работать следующим образом:
Вот упрощенная версия моего подхода:
from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id
# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()
# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)
# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024 # 100MB
def download_and_tar_files(partition):
accumulated_size = 0
files_to_tar = []
for row in partition:
file = row.filename
file_path, file_size = download_file(file)
files_to_tar.append(file_path)
accumulated_size += file_size
if accumulated_size >= FILE_SIZE_LIMIT:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
files_to_tar = []
accumulated_size = 0
# Handle any remaining files
if files_to_tar:
tar_file_name = f"{uuid.uuid4()}.tar"
create_tar_file(files_to_tar, tar_file_name)
# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)
Может ли кто-нибудь дать представление о потенциальных проблемах с моим расчетом размера или логикой пакетной обработки? Я также попытался использовать функцию map
для обработки каждой строки имени файла. Несмотря на этот подход, моя текущая реализация приводит к созданию tar-файла для каждого отдельного файла, при этом не удается накопить несколько файлов до достижения порогового размера. Это приводит к выводу, что каждый архив tar содержит только один файл, а не предусмотренную пакетную обработку до тех пор, пока не будет достигнут указанный предел размера.
Проблема, с которой я столкнулся, когда FILE_SIZE_LIMIT
не соблюдается на основных узлах AWS EMR, связана со способом управления глобальными переменными и доступа к ним в распределенной архитектуре Spark. В частности, FILE_SIZE_LIMIT
, будучи глобальной переменной, была инициализирована на основном (главном) узле кластера EMR. Однако эта переменная не была напрямую доступна на основных узлах (исполнителях), поскольку каждый узел работал в своей собственной памяти и пространстве процессов.
После небольшого углубленного изучения я понял, что в приложениях Spark драйвер (работающий на основном узле) и исполнители (работающие на основных узлах) не используют одно и то же пространство памяти. Это означает, что глобальные переменные, объявленные в сценарии драйвера, не передаются автоматически исполнителям. В результате любая логика, основанная на FILE_SIZE_LIMIT
внутри узлов-исполнителей, не будет работать должным образом, поскольку у исполнителей нет правильного значения этой переменной.
Чтобы решить эту проблему, мне нужно было явно передать значение FILE_SIZE_LIMIT
узлам-исполнителям. Это было достигнуто за счет включения его в качестве аргумента функций, выполняемых исполнителями.