Обеспечение соблюдения ограничения размера файла при пакетной обработке загрузок в PySpark на EMR

Я работаю над приложением PySpark, работающим на Amazon EMR, где моя задача включает загрузку файлов на основе URL-адреса из DataFrame. Цель состоит в том, чтобы непрерывно загружать эти файлы на исполнитель EMR до тех пор, пока не будет достигнут указанный предел размера файла. Как только этот предел будет достигнут, я собираюсь объединить эти файлы в tar-архив. Однако я столкнулся с проблемой правильного соблюдения ограничения размера файла перед началом процесса создания tar-файла. Я попытался использовать функцию mapPartitions с целью пакетной обработки нескольких файлов до тех пор, пока они вместе не достигнут ограничения на размер файла, после чего их следует заархивировать в один tar-файл.

Процесс должен работать следующим образом:

  1. Загрузите файлы.
  2. Следите за совокупным размером этих загруженных файлов.
  3. Как только совокупный размер достигнет заранее определенного предела, создайте tar-файл с этими файлами.
  4. Сбросьте отслеживание и запустите процесс заново для следующей партии файлов, пока все они не будут обработаны.

Вот упрощенная версия моего подхода:

from pyspark.sql import SparkSession
import os
import uuid
from pyspark.sql.functions import spark_partition_id

# Initialize Spark session
spark = SparkSession.builder.appName("DownloadandTar").getOrCreate()

# Sample DataFrame of files
files = [("file1",), ("file2",), ("file3",), ...]
schema = ["file"]
df = spark.createDataFrame(files, schema=schema)

# Define the file size limit (e.g., 100MB)
FILE_SIZE_LIMIT = 100 * 1024 * 1024  # 100MB

def download_and_tar_files(partition):
    accumulated_size = 0
    files_to_tar = []

    for row in partition:
        file = row.filename
        file_path, file_size = download_file(file)
        files_to_tar.append(file_path)
        accumulated_size += file_size

        if accumulated_size >= FILE_SIZE_LIMIT:
            tar_file_name = f"{uuid.uuid4()}.tar"
            create_tar_file(files_to_tar, tar_file_name)
            files_to_tar = []
            accumulated_size = 0

    # Handle any remaining files
    if files_to_tar:
        tar_file_name = f"{uuid.uuid4()}.tar"
        create_tar_file(files_to_tar, tar_file_name)

# Apply the function to each partition
df.repartition(spark_partition_id()).foreachPartition(download_and_tar_files)

Может ли кто-нибудь дать представление о потенциальных проблемах с моим расчетом размера или логикой пакетной обработки? Я также попытался использовать функцию map для обработки каждой строки имени файла. Несмотря на этот подход, моя текущая реализация приводит к созданию tar-файла для каждого отдельного файла, при этом не удается накопить несколько файлов до достижения порогового размера. Это приводит к выводу, что каждый архив tar содержит только один файл, а не предусмотренную пакетную обработку до тех пор, пока не будет достигнут указанный предел размера.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
53
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблема, с которой я столкнулся, когда FILE_SIZE_LIMIT не соблюдается на основных узлах AWS EMR, связана со способом управления глобальными переменными и доступа к ним в распределенной архитектуре Spark. В частности, FILE_SIZE_LIMIT, будучи глобальной переменной, была инициализирована на основном (главном) узле кластера EMR. Однако эта переменная не была напрямую доступна на основных узлах (исполнителях), поскольку каждый узел работал в своей собственной памяти и пространстве процессов.

После небольшого углубленного изучения я понял, что в приложениях Spark драйвер (работающий на основном узле) и исполнители (работающие на основных узлах) не используют одно и то же пространство памяти. Это означает, что глобальные переменные, объявленные в сценарии драйвера, не передаются автоматически исполнителям. В результате любая логика, основанная на FILE_SIZE_LIMIT внутри узлов-исполнителей, не будет работать должным образом, поскольку у исполнителей нет правильного значения этой переменной.

Чтобы решить эту проблему, мне нужно было явно передать значение FILE_SIZE_LIMIT узлам-исполнителям. Это было достигнуто за счет включения его в качестве аргумента функций, выполняемых исполнителями.

Другие вопросы по теме