Убить всех «работников» при ошибке «слушателя» (многопроцессорность, настройка менеджера и очереди)

Я использую многопроцессорность для параллельного запуска workers разных файлов. Worker результаты поставлены в очередь. listener получает результаты из очереди и записывает их в файл.

Иногда listener может столкнуться с ошибками (различного происхождения). В этом случае listener молча умирает, но все остальные процессы продолжают работать (довольно удивительно, что ошибки worker приводят к завершению всех процессов).

Я хотел бы остановить все процессы (workers, listener и т. д.), когда listener обнаруживает ошибку. Как это можно сделать?

Схема моего кода следующая:

def worker(file_path, q):
    ## do something
    q.put(1.)
    return True

def listener(q):
    while True:
        m = q.get()
            if m == 'kill':
                break
            else:
                try:
                    # do something and write to file
                except Exception as err:
                    # raise error
                    tb = sys.exc_info()[2]
                    raise err.with_traceback(tb)

def main():
    manager = mp.Manager()
    q = manager.Queue(maxsize=3)
    with mp.Pool(5) as pool:
        watcher = pool.apply_async(listener, (q,))
        files = ['path_1','path_2','path_3'] 
        jobs = [ pool.apply_async(worker, (p,q,)) for p in files ]
        
        # fire off workers
        for job in jobs: 
            job.get()
        # kill the listener when done
        q.put('kill')

# run
if __name__ == "__main__":
   main()

Я попытался ввести event = manager.Event() и использовать его как флаг в main():

## inside the pool, after starting workers
while True:
    if event.is_set():
        for job in jobs:
            job.terminate()

Нет успеха. Вызов os._exit(1) в блоке исключений listener вызывает ошибку неработающего канала, но процессы не уничтожаются.

Я также попытался установить daemon = True,

for job in jobs:
    job.daemon = True

Не помогло.

На самом деле, для обработки listener исключений я использую callable, как того требует apply_async (чтобы они не полностью отключались). Это усложняет ситуацию, но ненамного.

Заранее спасибо.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
71
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Как всегда, есть много способов выполнить то, что вам нужно, но я бы, вероятно, предложил использовать Event, чтобы сигнализировать о том, что процессы должны завершиться. Я также не стал бы использовать Pool в этом случае, так как это действительно упрощает вещи только для простых случаев, когда вам нужно что-то вроде map. Более сложные варианты использования быстро упрощают создание собственного «пула» с необходимой вам функциональностью.

from multiprocessing import Process, Queue, Event
from random import random


def might_fail(a):
    assert(a > .001)

def worker(args_q: Queue, result_q: Queue, do_quit: Event):
    try:
        while not do_quit.is_set():
            args = args_q.get()
            if args is None:
                break
            else:
                # do something
                result_q.put(random())
    finally: #signal that worker is exiting even if exception is raised
        result_q.put(None) #signal listener that worker is exiting

def listener(result_q: Queue, do_quit: Event, n_workers: int):
    n_completed = 0
    while n_workers > 0:
        res = result_q.get()
        if res is None:
            n_workers -= 1
        else:
            n_completed += 1
            try:
                might_fail(res)
            except:
                do_quit.set() #let main continue
                print(n_completed)
                raise #reraise error after we signal others to stop
    do_quit.set() #let main continue
    print(n_completed)

if __name__ == "__main__":
    args_q = Queue()
    result_q = Queue()
    do_quit = Event()
    n_workers = 4

    listener_p = Process(target=listener, args=(result_q, do_quit, n_workers))
    listener_p.start()

    for _ in range(n_workers):
        worker_p = Process(target=worker, args=(args_q, result_q, do_quit))
        worker_p.start()

    for _ in range(1000):
        args_q.put("some/file.txt")

    for _ in range(n_workers):
        args_q.put(None)

    do_quit.wait()
    print('done')

Это значительно проясняет структуру программы, спасибо! Ваш код приятно читать. Тем не менее, это не решает проблему: worker может начать работать, когда do_quit равно False, но после завершения do_quit станет True. В этом случае worker зависает, так как listener умер, и Queue некуда выпустить результат. Я сохранил error, не поднимая его явно внутри цикла, и поднял его вне цикла. Это сработало. В очередной раз благодарим за помощь!

Ivan K. 23.11.2022 20:00

@ИванК. Проблема, которую вы описываете, на самом деле является одним из крайних случаев, которые рассматривает Pool, и имеет код очистки для очистки очередей как часть процесса закрытия пула. Я не включил это в свой пример, чтобы сделать его кратким. Если вы согласны с подходом «убить его огнем», вы можете .terminate() всех рабочих процессов и .cancel_join_thread() всех очередей из основного процесса, как только do_quit.wait() вернется. Единственная проблема заключается в том, что terminate может вытеснять любые буферизованные данные stdout или stderr из дочерних процессов, что в моем случае приводило к тому, что исключение не всегда печаталось.

Aaron 23.11.2022 22:57

Это также отличный пример того, почему рекомендуется добавлять тайм-ауты к put и get, когда это возможно, но опять же, это немного удлиняет пример.

Aaron 23.11.2022 23:02

Другие вопросы по теме