Распараллелить цикл for в pyspark; одна таблица на итерацию

У меня есть несколько десятков искровых таблиц в Databricks размером от ~1 до ~20 ГБ, и я хочу выполнить функцию для каждой из этих таблиц. Поскольку между результатами каждого запроса нет взаимозависимости, это должно быть легко распараллелить.

Однако я понятия не имею, как поручить pyspark выполнять следующий код параллельно. Это просто продолжается стол за столом.

Это простая демонстрация, показывающая структуру моего кода:

Ячейка 1 (создайте несколько демонстрационных таблиц):

tables = []
columns = list("abc")
for i in range(10):
    nrows = int(1E6)
    ncols = len(columns)
    data = np.random.rand(ncols * nrows).reshape((nrows, ncols))
    schema = ", ".join([f"{_}: float" for _ in columns])
    table = spark.createDataFrame(data=data, schema=schema)
    tables.append(table)

Ячейка 2 (выполнить операцию над каждой из них):

quantiles = {}
for i, table in enumerate(tables):
    quantiles[i] = table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)

Примечание. Демо-версия немного упрощена. На самом деле в каждой таблице есть разные столбцы, поэтому я не могу их просто соединить.

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
167
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

ты не можешь просто использовать parallelize?

Каждый элемент списка tables должен быть PySpark DataFrame, созданным с помощью spark.createDataFrame

# create a RDD of the tables
# `sc` a.k.a. SparkContext should be already created once you started the session in Databricks
rdd_tables = sc.parallelize(tables)

# apply the function to each table in parallel
rdd_quantiles = rdd_tables.map(compute_quantiles)

quantiles = rdd_quantiles.collect()

Затем вы можете сначала создать список фреймов данных, а затем использовать функции parallelize и map.

Выглядит многообещающе, но приводит к ошибке во время sc.parallelize(tables) PicklingError: Не удалось сериализовать объект: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] Похоже, вы пытаетесь ссылаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих процессах. Подробнее см. ИСКРА-5063 Ну, я не совсем понимаю, что бы я сюда вкладывал 🤷‍♂️

ascripter 07.05.2024 13:14

можете ли вы попытаться создать список массивов и схем NumPy и увидеть результат?

vagitus 07.05.2024 13:43

Не понимаю, что вы имеете в виду — список чего? схема для каждой таблицы? И что тогда с этим делать? ... Тем временем я пытался распараллелить вручную через SparkContext.runJob, но безрезультатно :-/

ascripter 07.05.2024 13:57

Я знаю, что такое массив numpy. Я до сих пор не понимаю, как это поможет мне с моей проблемой распараллеливания вызова функции в нескольких таблицах.

ascripter 07.05.2024 14:05
Ответ принят как подходящий
from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            print(running_task.result())

# Replace with actual logic.
def process_table(table_name):
    return table_name, spark.sql(f"select count(*) from {table_name}").collect()[0][0]

table_names = ['A', 'B', 'C']

# Flatten the list of lambda functions before passing it to the executor
tasks = [lambda x=name: process_table(x) for name in table_names]

run_io_tasks_in_parallel(tasks)

Обновлено: Изменен пример с предоставленным примером кода. Надеюсь это поможет.

import numpy as np

tables = []
for i in range(10):
    columns = list("abc")
    nrows = int(1E6)
    ncols = len(columns)
    data = np.random.rand(ncols * nrows).reshape((nrows, ncols)).tolist()
    schema = ", ".join([f"{_}: float" for _ in columns])
    table = spark.createDataFrame(data=data, schema=schema)
    tables.append(table)

from concurrent.futures import ThreadPoolExecutor
quantiles = {}

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            index, result = running_task.result()
            quantiles[index] = result

def process_table(index, table):
    return index, table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)

# Flatten the list of lambda functions before passing it to the executor
tasks = [
    lambda x=index, y=table: 
    process_table(x, y) for index, table in enumerate(tables)
]

run_io_tasks_in_parallel(tasks)

Другие вопросы по теме