У меня есть [абстрактный] многопроцессорный скрипт ниже, где я пытаюсь:
1) разделить рабочую нагрузку между двумя процессами (добавить 1 к каждой переменной в списке + добавить новую переменную в новый список)
2) Объедините выходные данные обоих процессов в новый глобальный список для дальнейшей обработки.
Любая идея о том, как я могу получить выходные данные обоих процессов и объединить эти выходные данные в глобальный список? Что я хочу получить после выполнения:
new_id_list = [2, 4, 6, 8, 10, 3, 5, 7, 9, 11] #new_id_list от worker1 + new_id_list от worker2
#python2
from multiprocessing import Process, Queue
from time import sleep
#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]
#new output id list
new_id_list = []
queue = Queue()
def mp_worker(queue):
while queue.qsize() >0 :
record = queue.get()
new_id_list.append(record+1)
sleep(.1)
print(new_id_list)
###how would I go about passing this new_id_list as the global variable
print("worker closed")
def mp_handler():
# Spawn two processes, assigning the method to be executed
# and the input arguments (the queue)
processes = [Process(target=mp_worker, args=(queue,)) for _ in range(2)]
for process in processes:
process.start()
print('Process started')
for process in processes:
process.join()
if __name__ == '__main__':
for id in id_list:
queue.put(id)
mp_handler()
Я предполагаю, что проблема, с которой вы столкнулись, заключалась в невозможности обмена обоими процессами new_id_list
.
Что вам нужно сделать, так это создать еще один Queue
, который будет представлять очередь результатов и передавать ее обоим процессам. Добавляйте в очередь по мере необходимости внутри процессов, и в конце выполнения обоих процессов (после process.join()
) вы просто извлекаете все из очереди в список.
Найден статья здесь относительно решения.
Рабочий код ниже. В принципе:
1) мы используем multiprocessing.Manager()
2) создать список с помощью менеджера
3) Передайте список каждому рабочему, затем пусть каждый рабочий добавит вывод обратно в список.
from multiprocessing import Process, Queue
from time import sleep
import multiprocessing
#records to process
id_list = [1,2,3,4,5,6,7,8,9,10]
#new output id list
new_id_list = []
queue = Queue()
def mp_worker(d,queue):
while queue.qsize() >0 :
record = queue.get()
new_id_list.append(record+1)
d.append(record+1)
sleep(.1)
print(new_id_list)
print("worker closed")
def mp_handler():
# Spawn two processes, assigning the method to be executed
# and the input arguments (the queue)
processes = [Process(target=mp_worker, args=(d,queue,)) for _ in range(2)]
for process in processes:
process.start()
print('Process started')
for process in processes:
process.join()
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.list()
for id in id_list:
queue.put(id)
mp_handler()
print(d) #
в итоге сделал что-то подобное, используя multiprocessing.Manager(), а затем создал список с помощью менеджера.