Я использую многопроцессорность для параллельного запуска workers
разных файлов. Worker
результаты поставлены в очередь. listener
получает результаты из очереди и записывает их в файл.
Иногда listener
может столкнуться с ошибками (различного происхождения). В этом случае listener
молча умирает, но все остальные процессы продолжают работать (довольно удивительно, что ошибки worker
приводят к завершению всех процессов).
Я хотел бы остановить все процессы (worker
s, listener
и т. д.), когда listener
обнаруживает ошибку. Как это можно сделать?
Схема моего кода следующая:
def worker(file_path, q):
## do something
q.put(1.)
return True
def listener(q):
while True:
m = q.get()
if m == 'kill':
break
else:
try:
# do something and write to file
except Exception as err:
# raise error
tb = sys.exc_info()[2]
raise err.with_traceback(tb)
def main():
manager = mp.Manager()
q = manager.Queue(maxsize=3)
with mp.Pool(5) as pool:
watcher = pool.apply_async(listener, (q,))
files = ['path_1','path_2','path_3']
jobs = [ pool.apply_async(worker, (p,q,)) for p in files ]
# fire off workers
for job in jobs:
job.get()
# kill the listener when done
q.put('kill')
# run
if __name__ == "__main__":
main()
Я попытался ввести event = manager.Event()
и использовать его как флаг в main():
## inside the pool, after starting workers
while True:
if event.is_set():
for job in jobs:
job.terminate()
Нет успеха. Вызов os._exit(1) в блоке исключений listener
вызывает ошибку неработающего канала, но процессы не уничтожаются.
Я также попытался установить daemon = True,
for job in jobs:
job.daemon = True
Не помогло.
На самом деле, для обработки listener
исключений я использую callable, как того требует apply_async
(чтобы они не полностью отключались). Это усложняет ситуацию, но ненамного.
Заранее спасибо.
Как всегда, есть много способов выполнить то, что вам нужно, но я бы, вероятно, предложил использовать Event
, чтобы сигнализировать о том, что процессы должны завершиться. Я также не стал бы использовать Pool
в этом случае, так как это действительно упрощает вещи только для простых случаев, когда вам нужно что-то вроде map
. Более сложные варианты использования быстро упрощают создание собственного «пула» с необходимой вам функциональностью.
from multiprocessing import Process, Queue, Event
from random import random
def might_fail(a):
assert(a > .001)
def worker(args_q: Queue, result_q: Queue, do_quit: Event):
try:
while not do_quit.is_set():
args = args_q.get()
if args is None:
break
else:
# do something
result_q.put(random())
finally: #signal that worker is exiting even if exception is raised
result_q.put(None) #signal listener that worker is exiting
def listener(result_q: Queue, do_quit: Event, n_workers: int):
n_completed = 0
while n_workers > 0:
res = result_q.get()
if res is None:
n_workers -= 1
else:
n_completed += 1
try:
might_fail(res)
except:
do_quit.set() #let main continue
print(n_completed)
raise #reraise error after we signal others to stop
do_quit.set() #let main continue
print(n_completed)
if __name__ == "__main__":
args_q = Queue()
result_q = Queue()
do_quit = Event()
n_workers = 4
listener_p = Process(target=listener, args=(result_q, do_quit, n_workers))
listener_p.start()
for _ in range(n_workers):
worker_p = Process(target=worker, args=(args_q, result_q, do_quit))
worker_p.start()
for _ in range(1000):
args_q.put("some/file.txt")
for _ in range(n_workers):
args_q.put(None)
do_quit.wait()
print('done')
@ИванК. Проблема, которую вы описываете, на самом деле является одним из крайних случаев, которые рассматривает Pool
, и имеет код очистки для очистки очередей как часть процесса закрытия пула. Я не включил это в свой пример, чтобы сделать его кратким. Если вы согласны с подходом «убить его огнем», вы можете .terminate()
всех рабочих процессов и .cancel_join_thread()
всех очередей из основного процесса, как только do_quit.wait()
вернется. Единственная проблема заключается в том, что terminate
может вытеснять любые буферизованные данные stdout или stderr из дочерних процессов, что в моем случае приводило к тому, что исключение не всегда печаталось.
Это также отличный пример того, почему рекомендуется добавлять тайм-ауты к put
и get
, когда это возможно, но опять же, это немного удлиняет пример.
Это значительно проясняет структуру программы, спасибо! Ваш код приятно читать. Тем не менее, это не решает проблему:
worker
может начать работать, когдаdo_quit
равно False, но после завершенияdo_quit
станет True. В этом случае worker зависает, так какlistener
умер, и Queue некуда выпустить результат. Я сохранилerror
, не поднимая его явно внутри цикла, и поднял его вне цикла. Это сработало. В очередной раз благодарим за помощь!