Позвольте мне начать с того, что я не использую очередь, поэтому этот вопрос не является дубликатом Вот этот, и я не использую пул процессов, поэтому это не дубликат Вот этот.
У меня есть объект Process, который использует пул рабочих потоков для выполнения некоторой задачи. Ради MCVE эта задача просто создает список целых чисел от 0 до 9. Вот мой источник:
#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg):
super(Test, self).__init__()
self.arg = arg
self.pool = Pool()
def run(self):
quest = Quest()
done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
stdout.flush()
self.arg = [item for item in done.get()]
def __str__(self):
return str(self.arg)
# I tried both with and without this method
def join(self, timeout=None):
self.pool.close()
self.pool.join()
super(Test, self).join(timeout)
test = Test("test")
print(test) # should print 'test' (and does)
test.start()
# this line hangs forever
_ = test.join()
print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
Это довольно грубая модель того, что я хочу, чтобы моя настоящая программа делала. Проблема, как указано в комментариях, в том, что Test.join всегда зависает вечно. Это полностью не зависит от того, переопределен ли этот метод в классе Test. Он также никогда ничего не печатает, но вывод, когда я отправляю сигнал KeyboardInterrupt, указывает на то, что проблема заключается в получении результатов от рабочих:
test
^CTraceback (most recent call last):
File "./test.py", line 44, in <module>
Process Test-1:
_ = test.join()
File "./test.py", line 34, in join
super(Test, self).join(timeout)
File "/path/to/multiprocessing/process.py", line 124, in join
res = self._popen.wait(timeout)
File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "./test.py", line 25, in run
self.arg = [item for item in done.get()]
File "/path/to/multiprocessing/pool.py", line 638, in get
self.wait(timeout)
File "/path/to/multiprocessing/pool.py", line 635, in wait
self._event.wait(timeout)
File "/path/to/threading.py", line 551, in wait
signaled = self._cond.wait(timeout)
File "/path/to/threading.py", line 295, in wait
waiter.acquire()
KeyboardInterrupt
Почему глупый процесс не тупо завершается? Единственное, что делает рабочий, - это однократное разыменование и вызов функции, выполняющий одну операцию, это должно быть очень просто.
Я забыл упомянуть: это отлично работает, если я сделаю Test подклассом threading.Thread вместо multiprocessing.Process. Я действительно не уверен, почему это разбивает его пополам.






Ваша цель - выполнить эту работу асинхронно. Почему бы не порождать асинхронные рабочие процессы подпроцесса из вашего основного процесса БЕЗ порождения дочернего процесса (класс Test)? Результаты будут доступны в вашем основном процессе, и вам не нужно будет ничего делать. Вы можете прекратить читать здесь, если решите это сделать. В противном случае читайте дальше.
Ваше соединение работает вечно, потому что есть два отдельных пула: один при создании объекта процесса (локальный для вашего основного процесса), а другой, когда вы разветвляете процесс, вызывая process.start () (локальный для порожденного процесса)
Например, это не работает:
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
self.pool = Pool()
def run(self):
iterable = list(range(10))
self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
self.pool.close()
Однако это работает:
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
pool = Pool()
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared))
pool.close()
Это связано с тем фактом, что когда вы создаете процесс, весь код, стек и сегменты кучи вашего процесса клонируются в процесс, так что ваш основной процесс и подпроцесс имеют разные контексты.
Итак, вы вызываете join () для объекта пула, созданного локально для вашего основного процесса, и который вызывает close () для пула. Затем в run () есть еще один объект пула, который был клонирован в подпроцесс при вызове start (), и этот пул никогда не был закрыт и не может быть присоединен таким образом, как вы это делаете. Проще говоря, ваш основной процесс не имеет ссылки на клонированный объект пула в подпроцессе.
This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.
Имеет смысл, потому что потоки отличаются от процессов тем, что они имеют независимые стеки вызовов, но совместно используют другие сегменты памяти, поэтому любые обновления, которые вы делаете для объекта, созданного в другом потоке, видны в вашем основном процессе (который является родительским для этих потоков. ) и наоборот.
Решением является создание объекта пула, локального для функции run (). Закройте объект пула в контексте подпроцесса и присоединитесь к подпроцессу в основном процессе. Что подводит нас к пункту 2 ...
Если кто-то более опытный с Manager () захочет вмешаться в его внутренности, это было бы круто. Но следующий код дает вам ожидаемое поведение:
#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout
class Quest():
def __init__(self):
pass
def doIt(self, i):
return i
class Test(Process):
def __init__(self, arg, shared):
super(Test, self).__init__()
self.arg = arg
self.quest = Quest()
self.shared = shared
def run(self):
with Pool() as pool:
iterable = list(range(10))
self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
print("1" + str(self.shared)) # can remove, just to make sure we've updated state
def __str__(self):
return str(self.arg)
with Manager() as manager:
res = manager.list()
test = Test("test", res)
print(test) # should print 'test' (and does)
test.start()
test.join()
print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'
Выходы:
rpg711$ python multiprocess_async_join.py
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Для тех, кто наткнулся на это: я не могу засвидетельствовать использование Manager, потому что я решил использовать вместо них Pipe, но бит о дублировании пула между процессами привел меня к моему решению.
Спасибо за ответ, я все еще проверяю это и вернусь к этому, когда узнаю, что моя проблема решена. Вы спросили, почему я потрудился создать еще один процесс: этот MCVE не совсем отражает весь объем того, что я пытаюсь выполнить. Я создаю около 500 сложных соединений, используя по 6 сокетов каждое, и выполняю некоторые вычисления с отправленными и полученными данными. Когда я попробовал это, используя только потоки, я обнаружил, что с достаточно высокой задержкой я никогда не смогу распечатать результаты, потому что узлы с более низкой задержкой всегда будут иметь приоритет для GIL. Следовательно, процесс с привязкой к вычислению, который использует рабочих с привязкой к io