Объект процесса Python3 никогда не присоединяется

Позвольте мне начать с того, что я не использую очередь, поэтому этот вопрос не является дубликатом Вот этот, и я не использую пул процессов, поэтому это не дубликат Вот этот.

У меня есть объект Process, который использует пул рабочих потоков для выполнения некоторой задачи. Ради MCVE эта задача просто создает список целых чисел от 0 до 9. Вот мой источник:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg):
        super(Test, self).__init__()
        self.arg = arg
        self.pool = Pool()

    def run(self):
        quest = Quest()
        done = self.pool.map_async(quest.doIt, range(10), error_callback=print)
        stdout.flush()

        self.arg = [item for item in done.get()]

    def __str__(self):
        return str(self.arg)

    # I tried both with and without this method
    def join(self, timeout=None):
        self.pool.close()
        self.pool.join()
        super(Test, self).join(timeout)


test = Test("test")

print(test) # should print 'test' (and does)

test.start()

# this line hangs forever
_ = test.join()

print(test) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

Это довольно грубая модель того, что я хочу, чтобы моя настоящая программа делала. Проблема, как указано в комментариях, в том, что Test.join всегда зависает вечно. Это полностью не зависит от того, переопределен ли этот метод в классе Test. Он также никогда ничего не печатает, но вывод, когда я отправляю сигнал KeyboardInterrupt, указывает на то, что проблема заключается в получении результатов от рабочих:

test
^CTraceback (most recent call last):
  File "./test.py", line 44, in <module>
Process Test-1:
    _ = test.join()
  File "./test.py", line 34, in join
    super(Test, self).join(timeout)
  File "/path/to/multiprocessing/process.py", line 124, in join
    res = self._popen.wait(timeout)
  File "/path/to/multiprocessing/popen_fork.py", line 51, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/path/to/multiprocessing/popen_fork.py", line 29, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/path/to/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "./test.py", line 25, in run
    self.arg = [item for item in done.get()]
  File "/path/to/multiprocessing/pool.py", line 638, in get
    self.wait(timeout)
  File "/path/to/multiprocessing/pool.py", line 635, in wait
    self._event.wait(timeout)
  File "/path/to/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/path/to/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

Почему глупый процесс не тупо завершается? Единственное, что делает рабочий, - это однократное разыменование и вызов функции, выполняющий одну операцию, это должно быть очень просто.

Я забыл упомянуть: это отлично работает, если я сделаю Test подклассом threading.Thread вместо multiprocessing.Process. Я действительно не уверен, почему это разбивает его пополам.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
3
0
768
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  1. Ваша цель - выполнить эту работу асинхронно. Почему бы не порождать асинхронные рабочие процессы подпроцесса из вашего основного процесса БЕЗ порождения дочернего процесса (класс Test)? Результаты будут доступны в вашем основном процессе, и вам не нужно будет ничего делать. Вы можете прекратить читать здесь, если решите это сделать. В противном случае читайте дальше.

  2. Ваше соединение работает вечно, потому что есть два отдельных пула: один при создании объекта процесса (локальный для вашего основного процесса), а другой, когда вы разветвляете процесс, вызывая process.start () (локальный для порожденного процесса)

Например, это не работает:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared
    self.pool = Pool()

def run(self):
    iterable = list(range(10))
    self.shared.extend(self.pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    self.pool.close()

Однако это работает:

def __init__(self, arg, shared):
    super(Test, self).__init__()
    self.arg = arg
    self.quest = Quest()
    self.shared = shared

def run(self):
    pool = Pool()
    iterable = list(range(10))
    self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
    print("1" + str(self.shared))
    pool.close()

Это связано с тем фактом, что когда вы создаете процесс, весь код, стек и сегменты кучи вашего процесса клонируются в процесс, так что ваш основной процесс и подпроцесс имеют разные контексты.

Итак, вы вызываете join () для объекта пула, созданного локально для вашего основного процесса, и который вызывает close () для пула. Затем в run () есть еще один объект пула, который был клонирован в подпроцесс при вызове start (), и этот пул никогда не был закрыт и не может быть присоединен таким образом, как вы это делаете. Проще говоря, ваш основной процесс не имеет ссылки на клонированный объект пула в подпроцессе.

This works fine if I make Test a subclass of threading.Thread instead of multiprocessing.Process. I'm really not sure why this breaks it in half.

Имеет смысл, потому что потоки отличаются от процессов тем, что они имеют независимые стеки вызовов, но совместно используют другие сегменты памяти, поэтому любые обновления, которые вы делаете для объекта, созданного в другом потоке, видны в вашем основном процессе (который является родительским для этих потоков. ) и наоборот.

Решением является создание объекта пула, локального для функции run (). Закройте объект пула в контексте подпроцесса и присоединитесь к подпроцессу в основном процессе. Что подводит нас к пункту 2 ...

  1. Совместное состояние: есть эти объекты multiprocessing.Manager (), которые допускают какое-то волшебное безопасное совместное состояние между процессами. Не похоже, что менеджер позволяет переназначать ссылки на объекты, что имеет смысл, потому что, если вы переназначаете управляемое значение в подпроцессе, когда подпроцесс завершается, этот контекст процесса (код, стек, куча) исчезает, и ваш основной процесс никогда не видит это присвоение (поскольку оно было сделано с помощью ссылки на объект, локальный в контексте подпроцесса). Однако это может работать для примитивных значений ctype.

Если кто-то более опытный с Manager () захочет вмешаться в его внутренности, это было бы круто. Но следующий код дает вам ожидаемое поведение:

#!/usr/bin/env python3
from multiprocessing.pool import ThreadPool as Pool
from multiprocessing import Process, Manager
from sys import stdout

class Quest():
    def __init__(self):
        pass

    def doIt(self, i):
        return i

class Test(Process):

    def __init__(self, arg, shared):
        super(Test, self).__init__()
        self.arg = arg
        self.quest = Quest()
        self.shared = shared

    def run(self):
        with Pool() as pool:
            iterable = list(range(10))
            self.shared.extend(pool.map_async(self.quest.doIt, iterable, error_callback=print).get())
            print("1" + str(self.shared)) # can remove, just to make sure we've updated state

    def __str__(self):
        return str(self.arg)

with Manager() as manager:
    res = manager.list()
    test = Test("test", res)

    print(test) # should print 'test' (and does)

    test.start()
    test.join()

    print("2" + str(res)) # should print '[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]'

Выходы:

rpg711$ python multiprocess_async_join.py 
test
1[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Спасибо за ответ, я все еще проверяю это и вернусь к этому, когда узнаю, что моя проблема решена. Вы спросили, почему я потрудился создать еще один процесс: этот MCVE не совсем отражает весь объем того, что я пытаюсь выполнить. Я создаю около 500 сложных соединений, используя по 6 сокетов каждое, и выполняю некоторые вычисления с отправленными и полученными данными. Когда я попробовал это, используя только потоки, я обнаружил, что с достаточно высокой задержкой я никогда не смогу распечатать результаты, потому что узлы с более низкой задержкой всегда будут иметь приоритет для GIL. Следовательно, процесс с привязкой к вычислению, который использует рабочих с привязкой к io

anoneemus 21.03.2018 17:24

Для тех, кто наткнулся на это: я не могу засвидетельствовать использование Manager, потому что я решил использовать вместо них Pipe, но бит о дублировании пула между процессами привел меня к моему решению.

anoneemus 21.03.2018 18:19

Другие вопросы по теме