Я использую SCOOP (и Python 3.6 - не может быть обновлен) в своей работе. Мне нужно, чтобы все рабочие выполняли вычисление, затем ждали, пока корневой узел выполнит медленное вычисление (код в if __name__ == '__main__':
), затем выполнил другое вычисление с кадром данных, полученным в результате вычисления корневого узла.
Моя проблема в том, что SCOOP инициирует сразу всех воркеров и они пытаются запустить весь код вне if __name__ == '__main__':
асинхронно, даже если он ниже блока if
. Поскольку фрейма данных еще нет, они выдают ошибку.
Какая команда может заставить всех рабочих ждать, пока корневой рабочий процесс завершит вычисления, прежде чем продолжить выполнение остального кода?
Я безуспешно пытался экспериментировать с scoop.futures.map
, scoop.futures.supply
и multiprocessing.managers
. Я также пытался использовать multiprocessing.Barrier(8).wait()
, но это не сработало.
Есть метод scoop.futures.wait(futures)
, но я не знаю, как получить аргумент фьючерса...
У меня есть что-то вроде:
import pandas as pd
import genetic_algorithm
from scoop import futures
df = pd.read_csv('database.csv') # dataframe is to large to be passed to fitness_function for every worker. I want every worker to have a copy of it!
if __name__ == '__main__':
df = add_new_columns(df) # heavy computation which I just want to perform once (not by all workers)
df = computation_using_new_columns(df) # <--- !!! error - is executed before slow add_new_columns(df) finishes
def fitness_function(): ... # all workers use fitness_function() and an error is thrown if I put it inside the if __name__ == '__main__':
if __name__ == '__main__':
results = list(futures.map(genetic_algorithm, df))
и выполните скрипт с помощью python3 -m scoop script.py
, который сразу запускает всех рабочих...
@AhmedAEK Я думаю, что они мне нужны из if
, потому что, если я помещу их все внутрь, выдаст ошибку, что функции, определенные внутри if
, не определены для некоторых рабочих.
из вашего описания проблемы сложно сказать, что нужно делать, а что нет, нужен минимальный воспроизводимый пример.
сделал что-то более явное, это сложный генетический алгоритм, поэтому его трудно воспроизвести
глядя на структуру SCOOP
, я не вижу никаких инструментов для выполнения этой работы, возможно, вы могли бы использовать более гибкие инструменты, такие как встроенный модуль concurrent.futures
Python, я мог бы написать ответ для этого.
Если это сработает, то будет просто отлично
каждый процесс имеет свое собственное пространство памяти, изменение фрейма данных в основном процессе не влияет на рабочие процессы, вам необходимо передать его рабочим процессам с помощью какого-либо инициализатора после его обработки, который, похоже, недоступен в Фреймворк SCOOP, более гибким (но немного более сложным) инструментом будет встроенный в Python модуль multiprocessing.Pool.
import pandas as pd
import genetic_algorithm
from multiprocessing import Pool
def fitness_function(): ...
def initializer_func(df_from_parent):
global df
df = df_from_parent
df = computation_using_new_columns(df)
if __name__ == '__main__':
df = pd.read_csv(
'database.csv')
# read the df in the main process only as it needs to be modified
# before sending it to the workers
df = add_new_columns(df) # modify the df in the main process
# create as much workers as your cpu cores, and passes the df to them, and have each worker
# execute the computation_using_new_columns on it
with Pool(initializer=initializer_func, initargs=(df,)) as pool:
results = list(pool.imap(genetic_algorithm, df)) # now do your function
если computation_using_new_columns
нужно выполнить в каждом воркере, вы можете оставить его в инициализаторе, но если его нужно выполнить только один раз, вы можете поместить его после add_new_columns
внутри if __name__ == "__main__"
.
Спасибо! На самом деле у меня есть две точки, в которых я вызываю метод распараллеливания pool.map. Должен ли я открывать блок with ProcessPoolExecutor(initializer=initializer_func, initargs=(df,)) as pool:
перед каждым вызовом или я должен просто поместить весь код между pool.map в блоке with
? Другими словами, неэффективно ли открывать пул дважды?
@JoãoBravo нет, вам не следует открывать пул дважды, я бы рекомендовал вместо этого сохранить его в переменной и повторно использовать в обоих местах pool = ProcessPoolExecutor(initializer=initializer_func, initargs=(df,))
и просто позволить python определить время жизни или просто поместить весь код в блок with.
Понятно! Между тем, python выдает эту ошибку: Traceback (most recent call last):
File "script.py", line 512, in <module>
with ProcessPoolExecutor(initializer=get_dataframe_for_GA, initargs=(df,)) as pool:
TypeError: __init__() got an unexpected keyword argument 'initializer'
подпись метода просто ProcessPoolExecutor(max_workers=None)
@JoãoBravo, скорее всего, вы используете очень старую версию Python (> 5 лет), можете ли вы обновить ее как минимум до 3.7
? В противном случае вам придется реализовать один...
Это среда с Python 3.6, связанная с моей работой, которую нельзя обновить :( Как мне ее реализовать?
@JoãoBravo Сначала я обновил ответ, чтобы вместо этого использовать multiprocessing.Pool, посмотрите, работает ли он для вас.
Большое спасибо. Он полностью рабочий! Я отметил вас как правильный.
может сделать минимальный воспроизводимый пример проблемы? на текущее состояние вопроса нельзя ответить, не зная, что делает каждая из этих функций, но все они должны быть внутри предложения
if __name__ == "__main__:"