SCOOP - Как заставить рабочих ждать корневого работника, прежде чем продолжить

Я использую SCOOP (и Python 3.6 - не может быть обновлен) в своей работе. Мне нужно, чтобы все рабочие выполняли вычисление, затем ждали, пока корневой узел выполнит медленное вычисление (код в if __name__ == '__main__':), затем выполнил другое вычисление с кадром данных, полученным в результате вычисления корневого узла.

Моя проблема в том, что SCOOP инициирует сразу всех воркеров и они пытаются запустить весь код вне if __name__ == '__main__': асинхронно, даже если он ниже блока if. Поскольку фрейма данных еще нет, они выдают ошибку.

Какая команда может заставить всех рабочих ждать, пока корневой рабочий процесс завершит вычисления, прежде чем продолжить выполнение остального кода?


Я безуспешно пытался экспериментировать с scoop.futures.map, scoop.futures.supply и multiprocessing.managers. Я также пытался использовать multiprocessing.Barrier(8).wait(), но это не сработало.

Есть метод scoop.futures.wait(futures), но я не знаю, как получить аргумент фьючерса...

У меня есть что-то вроде:

import pandas as pd
import genetic_algorithm
from scoop import futures

df = pd.read_csv('database.csv') # dataframe is to large to be passed to fitness_function for every worker. I want every worker to have a copy of it!

if __name__ == '__main__':
    df = add_new_columns(df) # heavy computation which I just want to perform once (not by all workers)

df = computation_using_new_columns(df) # <--- !!! error - is executed before slow add_new_columns(df) finishes

def fitness_function(): ... # all workers use fitness_function() and an error is thrown if I put it inside the if __name__ == '__main__':

if __name__ == '__main__':
    results = list(futures.map(genetic_algorithm, df))

и выполните скрипт с помощью python3 -m scoop script.py, который сразу запускает всех рабочих...

может сделать минимальный воспроизводимый пример проблемы? на текущее состояние вопроса нельзя ответить, не зная, что делает каждая из этих функций, но все они должны быть внутри предложения if __name__ == "__main__:"

Ahmed AEK 25.01.2023 18:06

@AhmedAEK Я думаю, что они мне нужны из if, потому что, если я помещу их все внутрь, выдаст ошибку, что функции, определенные внутри if, не определены для некоторых рабочих.

João Bravo 25.01.2023 18:10

из вашего описания проблемы сложно сказать, что нужно делать, а что нет, нужен минимальный воспроизводимый пример.

Ahmed AEK 25.01.2023 18:11

сделал что-то более явное, это сложный генетический алгоритм, поэтому его трудно воспроизвести

João Bravo 25.01.2023 18:23

глядя на структуру SCOOP, я не вижу никаких инструментов для выполнения этой работы, возможно, вы могли бы использовать более гибкие инструменты, такие как встроенный модуль concurrent.futures Python, я мог бы написать ответ для этого.

Ahmed AEK 25.01.2023 18:34

Если это сработает, то будет просто отлично

João Bravo 25.01.2023 18:35
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
6
59
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

каждый процесс имеет свое собственное пространство памяти, изменение фрейма данных в основном процессе не влияет на рабочие процессы, вам необходимо передать его рабочим процессам с помощью какого-либо инициализатора после его обработки, который, похоже, недоступен в Фреймворк SCOOP, более гибким (но немного более сложным) инструментом будет встроенный в Python модуль multiprocessing.Pool.

import pandas as pd
import genetic_algorithm
from multiprocessing import Pool

def fitness_function(): ...

def initializer_func(df_from_parent):
    global df
    df = df_from_parent
    df = computation_using_new_columns(df)

if __name__ == '__main__':
    df = pd.read_csv(
        'database.csv')  
    # read the df in the main process only as it needs to be modified
    # before sending it to the workers

    df = add_new_columns(df)  # modify the df in the main process
    # create as much workers as your cpu cores, and passes the df to them, and have each worker
    # execute the computation_using_new_columns on it
    with Pool(initializer=initializer_func, initargs=(df,)) as pool:
        results = list(pool.imap(genetic_algorithm, df))  # now do your function

если computation_using_new_columns нужно выполнить в каждом воркере, вы можете оставить его в инициализаторе, но если его нужно выполнить только один раз, вы можете поместить его после add_new_columns внутри if __name__ == "__main__".

Спасибо! На самом деле у меня есть две точки, в которых я вызываю метод распараллеливания pool.map. Должен ли я открывать блок with ProcessPoolExecutor(initializer=initializer_func, initargs=(df,)) as pool: перед каждым вызовом или я должен просто поместить весь код между pool.map в блоке with? Другими словами, неэффективно ли открывать пул дважды?

João Bravo 27.01.2023 11:25

@JoãoBravo нет, вам не следует открывать пул дважды, я бы рекомендовал вместо этого сохранить его в переменной и повторно использовать в обоих местах pool = ProcessPoolExecutor(initializer=initializer_func, initargs=(df,)) и просто позволить python определить время жизни или просто поместить весь код в блок with.

Ahmed AEK 27.01.2023 11:29

Понятно! Между тем, python выдает эту ошибку: Traceback (most recent call last):File "script.py", line 512, in <module>with ProcessPoolExecutor(initializer=get_dataframe_for_GA, initargs=(df,)) as pool:TypeError: __init__() got an unexpected keyword argument 'initializer'

João Bravo 27.01.2023 11:57

подпись метода просто ProcessPoolExecutor(max_workers=None)

João Bravo 27.01.2023 11:59

@JoãoBravo, скорее всего, вы используете очень старую версию Python (> 5 лет), можете ли вы обновить ее как минимум до 3.7? В противном случае вам придется реализовать один...

Ahmed AEK 27.01.2023 12:14

Это среда с Python 3.6, связанная с моей работой, которую нельзя обновить :( Как мне ее реализовать?

João Bravo 27.01.2023 12:24

@JoãoBravo Сначала я обновил ответ, чтобы вместо этого использовать multiprocessing.Pool, посмотрите, работает ли он для вас.

Ahmed AEK 27.01.2023 12:27

Большое спасибо. Он полностью рабочий! Я отметил вас как правильный.

João Bravo 27.01.2023 12:54

Другие вопросы по теме