Переменная широковещательной рассылки dask завершается с ошибкой ключа при вычислении подмножества кадра данных pandas

У меня есть фрейм данных pandas, и я хочу применить дорогостоящую операцию к каждой группе. Поэтому я хочу распараллелить эту задачу с помощью dask. Исходный фрейм данных должен быть широковещательно передан. Но вычисление терпит неудачу только с:

<Future: error, key: iterated_costly_function-4aff5e66b6af1c073dc2cfd0d2dbb6f3>
<Future: error, key: iterated_costly_function-74d26e42c758a8cc177047d7a0f49ff4>

Вот код:

import pandas as pd
df = pd.DataFrame({'foo':[1,2,3,4,5,6], 'bar':['a', 'a', 'b', 'b', 'a', 'b']})
display(df)

unique_values = df.bar.unique()
print(unique_values)
for v in unique_values:
    subset_df = df[df.bar == v]
    display(subset_df)

Теперь при использовании dask:

import pandas as pd
from tqdm import tqdm
tqdm.pandas()
from time import sleep

from dask.distributed import Client, progress
from dask.distributed import wait, as_completed
from dask.distributed import Variable

from dask import delayed
# https://stackoverflow.com/questions/49406987/how-do-we-choose-nthreads-and-nprocs-per-worker-in-dask-distributed
client = Client()#threads_per_worker=8, n_workers=2)
client

remote_df = client.scatter(df, broadcast=True)
global_var = Variable(name = "remote_data")
global_var.set(remote_df)

def iterated_costly_function(v):
    df = global_var.get()
    subset_df = df[df.bar == v]
    #subset_df = apply_some_costly_function(subset_df, x=1, y=2, z=3)
    # not implemented here for sake of simplicity
    sleep(3)
    return subset_df#.values # make it return something

futures = client.map(iterated_costly_function, unique_values)

wait(futures)
for f in tqdm(futures):
    print(f)

Что не так в том, как я пытаюсь получить доступ к широковещательной переменной?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
165
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я бы написал вашу функцию так

def iterated_costly_function(v):
    df = Variable(name = "remote_data").get().result()
    subset_df = df[df.bar == v]
    sleep(3)
    return subset_df#.values

где

  • мы явно создаем экземпляр Variable, используя его имя, а не передаем его в замыкании (вы могли бы передать строку имени в качестве аргумента)
  • поскольку данные на самом деле являются будущим, вам нужно .result(), чтобы получить их ценность.

Другие вопросы по теме