Я пытаюсь использовать многопроцессорность для выполнения нескольких фоновых заданий и использовать основной процесс в качестве пользовательского интерфейса, который принимает команды через input(). Каждый процесс должен выполнять определенные задания и записывать свой текущий статус в словарь, который был создан с помощью manager.dict() и затем передан процессу.
После создания процессов существует цикл с input() для доступа к командам пользователя. Команды здесь сведены к минимуму для простоты.
from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
Метод, который работает в других процессах, довольно прост:
def start_producer(producer_setting: dict, status_dict: dict):
importer = MyProducer(producer_setting)
importer.set_status_dict(status_dict)
importer.run()
Я создаю экземпляр MyProducer и устанавливаю словарь состояния через установщик объекта и вызываю блокирующий метод run(), который вернется только тогда, когда производитель закончит работу. При вызове set_status_dict(status_dict) словарь заполняется элементами name и status.
Когда я запускаю код, создается впечатление, что производитель создается, я получаю вывод «Запуск теста производителя» и «инициализированный 1 производитель», а затем запрос «Введите команду» от input(), но кажется, что на самом деле процесс не работает. не убегаю.
Когда я нажимаю клавишу ВВОД, чтобы пропустить первую итерацию цикла, я получаю ожидаемый журнал «неизвестных команд», и процесс-производитель начинает фактическую работу. После этого моя команда "status" также работает должным образом.
Когда я ввожу «статус» в первой итерации цикла, я получаю ошибку ключа, потому что «имя» и «статус» не установлены в словаре. Эти ключи должны быть установлены в set_status_dict(), который сам называется Process(target=...).
Это почему? Разве производитель.start () не должен запускать полный блок start_producer внутри нового процесса и, следовательно, никогда не зависать на input() основного процесса?
Как я могу сначала запустить процессы без какого-либо участия пользователя и только потом ждать input()?
Редактировать: Полную программу mvce с этой проблемой можно найти здесь: https://pastebin.com/k8xvhLhn
Редактировать: Решение с sleep(1) после инициализации процессов найдено. Но почему вообще происходит такое поведение? Не следует ли запускать весь код в start_producer() в новом процессе?
Я не хочу делиться input(). другие процессы должны работать сами по себе, не взаимодействуя с вводом ..
Я (полностью) заменил комплект start_producer на status_dict['name'] = 'foo'; status_dict['status'] = 'thinking', и все остальное, похоже, работает так, как вы хотите.
Как узнать (почему вы думаете), что это висит на input()?
когда я нажимаю ввод, Unknown command. печатается в консоли, и после этого другой процесс запускается как исключение.
вот полный рабочий пример с этой упомянутой проблемой: pastebin.com/k8xvhLhn






У меня ограниченный опыт работы с модулем многопроцессорности, но мне удалось заставить его вести себя так (я думаю), как вы хотите. Сначала я добавил несколько операторов печати в верхней части цикла while, чтобы увидеть, что может происходить, и обнаружил, что если процесс был run или joined, он работал. Я подумал, что вы не хотите, чтобы он блокировался, поэтому я добавил вызов, чтобы продолжить процесс, но похоже, что run() также блокируется. Оказалось, что процесс просто не был завершен, когда произошла первая итерация цикла while - добавление time.sleep(30) в верхней части цикла дало процессу достаточно времени, чтобы запланировать его (ОС) и запустить. (На моей машине на самом деле требуется от 200 до 300 миллисекунд дремоты)
Я заменил start_producer на:
def start_producer(producer_setting: dict, status_dict: dict):
## importer = MyProducer(producer_setting)
## importer.set_status_dict(status_dict)
## importer.run()
#time.sleep(30)
status_dict['name'] = 'foo'
status_dict['status'] = 'thinking'
Ваш код изменен:
if __name__ == '__main__':
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
# add a call to run() but it blocks
#producer.run()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
time.sleep(30)
for p, s in producers:
#p.join()
#p.run()
print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
if 'name' in s and 'status' in s:
print('{0}:{1}'.format(s['name'], s['status']))
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
эти производственные процессы будут работать долгое время. это была причина, по которой я ввел их в процесс в первую очередь. и да, блоки run (), этот метод должен вызываться не основным процессом, а только что созданным. решение с sleep(1) после инициализации процессов вроде работает нормально, спасибо. Но мне любопытно, почему это происходит. Я действительно думал, что весь блок start_producer() уже запущен в другом процессе и не должен блокироваться. Я не понимаю, почему это происходит.
На мой взгляд, все это упражнение не заблокировано, просто у него не было времени ни запустить, ни закончить. Я действительно не знаю, как обновляется manager.dict, если он динамический и происходит во время работы другого процесса или если он обновляется только после завершения процесса, даже если это первый оператор (я): я действительно думаю это происходит в процессе во время выполнения кода.
@ JackO'neill Здесь много вопросов о многопроцессорной обработке, которые содержат ответы / комментарии, ссылающиеся на много накладных расходов для запуска другого процесса. Я полагаю, что накладные расходы включают в себя планирование ОС для задачи, которую вы не контролируете.
хорошо, это имеет смысл. Я не против немного подождать, прежде чем продолжить основной поток, было просто любопытно, почему. Спасибо за вашу помощь :)
@ JackO'neill Если вам действительно интересно, вы можете добавить несколько операторов time.time () в основной и подпроцесс (возможно, еще один элемент в status) и потом сравнить их.
вы не можете совместно использовать input () между процессами.