У меня есть несколько десятков искровых таблиц в Databricks размером от ~1 до ~20 ГБ, и я хочу выполнить функцию для каждой из этих таблиц. Поскольку между результатами каждого запроса нет взаимозависимости, это должно быть легко распараллелить.
Однако я понятия не имею, как поручить pyspark выполнять следующий код параллельно. Это просто продолжается стол за столом.
Это простая демонстрация, показывающая структуру моего кода:
Ячейка 1 (создайте несколько демонстрационных таблиц):
tables = []
columns = list("abc")
for i in range(10):
nrows = int(1E6)
ncols = len(columns)
data = np.random.rand(ncols * nrows).reshape((nrows, ncols))
schema = ", ".join([f"{_}: float" for _ in columns])
table = spark.createDataFrame(data=data, schema=schema)
tables.append(table)
Ячейка 2 (выполнить операцию над каждой из них):
quantiles = {}
for i, table in enumerate(tables):
quantiles[i] = table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)
Примечание. Демо-версия немного упрощена. На самом деле в каждой таблице есть разные столбцы, поэтому я не могу их просто соединить.





ты не можешь просто использовать parallelize?
Каждый элемент списка tables должен быть PySpark DataFrame, созданным с помощью spark.createDataFrame
# create a RDD of the tables
# `sc` a.k.a. SparkContext should be already created once you started the session in Databricks
rdd_tables = sc.parallelize(tables)
# apply the function to each table in parallel
rdd_quantiles = rdd_tables.map(compute_quantiles)
quantiles = rdd_quantiles.collect()
Затем вы можете сначала создать список фреймов данных, а затем использовать функции parallelize и map.
можете ли вы попытаться создать список массивов и схем NumPy и увидеть результат?
Не понимаю, что вы имеете в виду — список чего? схема для каждой таблицы? И что тогда с этим делать? ... Тем временем я пытался распараллелить вручную через SparkContext.runJob, но безрезультатно :-/
Я знаю, что такое массив numpy. Я до сих пор не понимаю, как это поможет мне с моей проблемой распараллеливания вызова функции в нескольких таблицах.
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
print(running_task.result())
# Replace with actual logic.
def process_table(table_name):
return table_name, spark.sql(f"select count(*) from {table_name}").collect()[0][0]
table_names = ['A', 'B', 'C']
# Flatten the list of lambda functions before passing it to the executor
tasks = [lambda x=name: process_table(x) for name in table_names]
run_io_tasks_in_parallel(tasks)
Обновлено: Изменен пример с предоставленным примером кода. Надеюсь это поможет.
import numpy as np
tables = []
for i in range(10):
columns = list("abc")
nrows = int(1E6)
ncols = len(columns)
data = np.random.rand(ncols * nrows).reshape((nrows, ncols)).tolist()
schema = ", ".join([f"{_}: float" for _ in columns])
table = spark.createDataFrame(data=data, schema=schema)
tables.append(table)
from concurrent.futures import ThreadPoolExecutor
quantiles = {}
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
index, result = running_task.result()
quantiles[index] = result
def process_table(index, table):
return index, table.approxQuantile(columns, [0.01, 0.99], relativeError=0.001)
# Flatten the list of lambda functions before passing it to the executor
tasks = [
lambda x=index, y=table:
process_table(x, y) for index, table in enumerate(tables)
]
run_io_tasks_in_parallel(tasks)
Выглядит многообещающе, но приводит к ошибке во время
sc.parallelize(tables)PicklingError: Не удалось сериализовать объект: PySparkRuntimeError: [CONTEXT_ONLY_VALID_ON_DRIVER] Похоже, вы пытаетесь ссылаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих процессах. Подробнее см. ИСКРА-5063 Ну, я не совсем понимаю, что бы я сюда вкладывал 🤷♂️