Блоки python input () с несколькими процессами

Я пытаюсь использовать многопроцессорность для выполнения нескольких фоновых заданий и использовать основной процесс в качестве пользовательского интерфейса, который принимает команды через input(). Каждый процесс должен выполнять определенные задания и записывать свой текущий статус в словарь, который был создан с помощью manager.dict() и затем передан процессу.

После создания процессов существует цикл с input() для доступа к командам пользователя. Команды здесь сведены к минимуму для простоты.

from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
    producers = []
    settings = [{'name':'test'}]

    for setting in settings:
        status = manager.dict()
        logger.info("Start Producer {0}".format(setting['name']))
        producer = Process(target=start_producer, args=(setting, status))
        producer.start()
        producers.append([producer, status])

    logger.info("initialized {0} producers".format(len(producers)))

    while True:
        text_command = input('Enter your command:')
        if text_command == 'exit':
            logger.info("waiting for producers")
            for p in producers:
                p[0].join()
            logger.info("Exit application.")
            break
        elif text_command == 'status':
            for p in producers:
                if 'name' in p[1] and 'status' in p[1]:
                    print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
        else:
            print("Unknown command.")

Метод, который работает в других процессах, довольно прост:

def start_producer(producer_setting: dict, status_dict: dict):
    importer = MyProducer(producer_setting)
    importer.set_status_dict(status_dict)
    importer.run()

Я создаю экземпляр MyProducer и устанавливаю словарь состояния через установщик объекта и вызываю блокирующий метод run(), который вернется только тогда, когда производитель закончит работу. При вызове set_status_dict(status_dict) словарь заполняется элементами name и status.

Когда я запускаю код, создается впечатление, что производитель создается, я получаю вывод «Запуск теста производителя» и «инициализированный 1 производитель», а затем запрос «Введите команду» от input(), но кажется, что на самом деле процесс не работает. не убегаю.

Когда я нажимаю клавишу ВВОД, чтобы пропустить первую итерацию цикла, я получаю ожидаемый журнал «неизвестных команд», и процесс-производитель начинает фактическую работу. После этого моя команда "status" также работает должным образом.

Когда я ввожу «статус» в первой итерации цикла, я получаю ошибку ключа, потому что «имя» и «статус» не установлены в словаре. Эти ключи должны быть установлены в set_status_dict(), который сам называется Process(target=...).

Это почему? Разве производитель.start () не должен запускать полный блок start_producer внутри нового процесса и, следовательно, никогда не зависать на input() основного процесса?

Как я могу сначала запустить процессы без какого-либо участия пользователя и только потом ждать input()?

Редактировать: Полную программу mvce с этой проблемой можно найти здесь: https://pastebin.com/k8xvhLhn

Редактировать: Решение с sleep(1) после инициализации процессов найдено. Но почему вообще происходит такое поведение? Не следует ли запускать весь код в start_producer() в новом процессе?

вы не можете совместно использовать input () между процессами.

Jean-François Fabre 04.04.2018 16:37

Я не хочу делиться input(). другие процессы должны работать сами по себе, не взаимодействуя с вводом ..

Jack O'neill 04.04.2018 16:38

Я (полностью) заменил комплект start_producer на status_dict['name'] = 'foo'; status_dict['status'] = 'thinking', и все остальное, похоже, работает так, как вы хотите.

wwii 04.04.2018 17:19

Как узнать (почему вы думаете), что это висит на input()?

wwii 04.04.2018 17:22

когда я нажимаю ввод, Unknown command. печатается в консоли, и после этого другой процесс запускается как исключение.

Jack O'neill 04.04.2018 17:23

вот полный рабочий пример с этой упомянутой проблемой: pastebin.com/k8xvhLhn

Jack O'neill 04.04.2018 17:28
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
6
144
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

У меня ограниченный опыт работы с модулем многопроцессорности, но мне удалось заставить его вести себя так (я думаю), как вы хотите. Сначала я добавил несколько операторов печати в верхней части цикла while, чтобы увидеть, что может происходить, и обнаружил, что если процесс был run или joined, он работал. Я подумал, что вы не хотите, чтобы он блокировался, поэтому я добавил вызов, чтобы продолжить процесс, но похоже, что run() также блокируется. Оказалось, что процесс просто не был завершен, когда произошла первая итерация цикла while - добавление time.sleep(30) в верхней части цикла дало процессу достаточно времени, чтобы запланировать его (ОС) и запустить. (На моей машине на самом деле требуется от 200 до 300 миллисекунд дремоты)

Я заменил start_producer на:

def start_producer(producer_setting: dict, status_dict: dict):
##    importer = MyProducer(producer_setting)
##    importer.set_status_dict(status_dict)
##    importer.run()
    #time.sleep(30)
    status_dict['name'] = 'foo'
    status_dict['status'] = 'thinking'

Ваш код изменен:

if __name__ == '__main__':
    with Manager() as manager:
        producers = []
        settings = [{'name':'test'}]

        for setting in settings:
            status = manager.dict()
            logger.info("Start Producer {0}".format(setting['name']))
            producer = Process(target=start_producer, args=(setting, status))
            producer.start()
            # add a call to run() but it blocks
            #producer.run()
            producers.append([producer, status])

        logger.info("initialized {0} producers".format(len(producers)))

        while True:
            time.sleep(30)
            for p, s in producers:
                #p.join()
                #p.run()
                print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
                if 'name' in s and 'status' in s:
                    print('{0}:{1}'.format(s['name'], s['status']))
            text_command = input('Enter your command:')
            if text_command == 'exit':
                logger.info("waiting for producers")
                for p in producers:
                    p[0].join()
                logger.info("Exit application.")
                break
            elif text_command == 'status':
                for p in producers:
                    if 'name' in p[1] and 'status' in p[1]:
                        print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
            else:
                print("Unknown command.")

эти производственные процессы будут работать долгое время. это была причина, по которой я ввел их в процесс в первую очередь. и да, блоки run (), этот метод должен вызываться не основным процессом, а только что созданным. решение с sleep(1) после инициализации процессов вроде работает нормально, спасибо. Но мне любопытно, почему это происходит. Я действительно думал, что весь блок start_producer() уже запущен в другом процессе и не должен блокироваться. Я не понимаю, почему это происходит.

Jack O'neill 05.04.2018 10:47

На мой взгляд, все это упражнение не заблокировано, просто у него не было времени ни запустить, ни закончить. Я действительно не знаю, как обновляется manager.dict, если он динамический и происходит во время работы другого процесса или если он обновляется только после завершения процесса, даже если это первый оператор (я): я действительно думаю это происходит в процессе во время выполнения кода.

wwii 05.04.2018 16:08

@ JackO'neill Здесь много вопросов о многопроцессорной обработке, которые содержат ответы / комментарии, ссылающиеся на много накладных расходов для запуска другого процесса. Я полагаю, что накладные расходы включают в себя планирование ОС для задачи, которую вы не контролируете.

wwii 05.04.2018 16:18

хорошо, это имеет смысл. Я не против немного подождать, прежде чем продолжить основной поток, было просто любопытно, почему. Спасибо за вашу помощь :)

Jack O'neill 05.04.2018 16:22

@ JackO'neill Если вам действительно интересно, вы можете добавить несколько операторов time.time () в основной и подпроцесс (возможно, еще один элемент в status) и потом сравнить их.

wwii 05.04.2018 16:40

Другие вопросы по теме