Почему dask не распараллеливает этот рабочий процесс?

У меня есть 2 очень простые функции:

import time

def sleepy(a=1):
    time.sleep(a)
    print(a)

def ending(*args):
    print(args)
    print('finished')

У меня также есть рабочий процесс dask, в котором используются следующие функции:

workflow = {'task_0': (sleepy, 1), 
            'task_1': (sleepy, 2), 
            'task_2': (sleepy, 3), 
            'ending': (ending, 'task_0', 'task_1', 'task_2')}

Этот рабочий процесс можно визуализировать так:

dask.visualize(workflow)

sleepy, sleepy, sleepy должны работать параллельно, но это не так.

Я жду 1 секунду, и он печатает 1 из sleepy(), затем я жду 2 секунды, и он печатает 2, затем я жду еще 3 секунды, и он печатает 3:

1
2
3
(None, None, None)
finished

Что я делаю неправильно?

Вы могли добиться этого, прочитав самый первый пример на dask-учебник.

rpanai 13.09.2018 19:34

@ user32185 Спасибо, сэр.

Legit Stack 13.09.2018 19:44
1
2
51
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Вот как я бы кодировал ваш рабочий процесс, и операции сна действительно происходят параллельно

import dask.delayed
import time

@dask.delayed
def sleepy(a=1):
    time.sleep(a)
    print(a)

@dask.delayed
def ending(*args):
    print(args)
    print('finished')

d = ending(*[sleepy(i) for i in [1, 2, 3]])
d.compute()

Учтите, что декоратор @ - это только синтаксическая аккуратность, также можно сделать dask.delayed(sleepy) и т. д.

Смена dask.get( на dask.threaded.get( устранила мою проблему, но мне также очень понравился ответ mdurant.

Другие вопросы по теме