У меня следующая проблема: у меня есть словарь с несколькими сотнями ключей (все еще около 150 МБ), каждый со сложным значением, содержащим словари, списки, отдельные значения. У меня есть 3 потока входящей информации с таймингами 1 с, 0,1 с и в реальном времени, в зависимости от типа данных. Чтобы ускорить обработку данных, я хочу использовать многопроцессорную обработку, чтобы создать 3 процесса для разных источников, желательно, чтобы каждый процесс имел свой собственный пул для дальнейшего ускорения.
Проблема в том, как «нарезать» общий дикт на обновляемые части. Мне кажется, что используя пул или процесс, я должен определить список аргументов в начале, когда я инициализирую процесс/пул. Моей задаче потребуется что-то вроде этого: я получаю сообщение о том, что ключ «A» необходимо обновить. Я назначаю работника для его обновления, передавая сообщение, содержащее новую информацию и сложный объект "A" (или, по крайней мере, соответствующее значение "A"). Я определенно не хочу передавать весь дикт каждому рабочему, потому что он использует много памяти.
В этом примере кода я хотел бы передать general_dict['А']['а'] только при обработке первого элемента пример_данных_а, general_dict['B']['a'] для третьего и так далее. То же самое для example_data_b. Как мне передать аргументы?
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}},
example_data_a = ['A', [2,1,2],
'A', [2,3,2],
'B', [3,0,5],
'C', [6,1,8]]
example_data_b = ['A', 'test11',
'B', 'test21',
'B', 'test22',
'C', 'test31']
def update_a(x):
...
def update_b(y):
...
if __name__ == "__main__":
p1 = multiprocessing.Process(target = update_a)
p2 = multiprocessing.Process(target = update_b)
p1.start()
p2.start()
p1.join()
p2.join()
Это зависит от данных, в крайних случаях я изменяю только первое значение списка или перебираю список из 1000 элементов 60-70 раз для одного обновления. В среднем размер моего буфера данных увеличивается на 5-10 000 за несколько минут, поэтому для одноядерной обработки это медленно.
Как в настоящее время обрабатываются три потока? Вы не показываете этого, и было бы полезно, если бы вы обрисовали это в общих чертах.
да, похоже, что эта структура данных не идеальна. возможно, предварительная обработка его один раз в более доступный решит проблему. из вашего вопроса немного неясно, как вы хотите сгруппировать свои данные. я думаю, если вы уточните, что в вашей голове код не будет сильно отставать.
после организации ваших данных потом вы можете беспокоиться о добавлении к ним многопроцессорной обработки. (это довольно просто - проверьте ProcessPoolExecutor)
@Booboo Три потока последовательно обрабатываются в заданных точках цикла while, где есть периоды ожидания и периоды управления. В периоды ожидания я обрабатываю все три потока пакетами. Затем по результатам контрольный период выдает команды. Это уже более 2000 строк кода, поэтому я хочу проиллюстрировать это на простом примере. Моя идея здесь заключается в том, что должен быть процесс, который обрабатывает поток1, один для потока2 и один для потока3, в то время как один процесс выполняет управление. Но для этого мне нужны только значения для данного ключа, у которого есть новая информация.
@acushner Это может случиться, я инженер, а не разработчик программного обеспечения. Я держу всю информацию по одному ключу, устройству, если хотите, в одном месте: списки параметров, статистику прошлых дней и матрицу актуальных исторических значений. Что требует наибольшего времени обработки, так это 2 столбца, различные (около 1000) строки, упорядоченная матрица параметров, где я должен добавлять, изменять или удалять строки. Было бы достаточно передать эту матрицу для функции обработки, но как мне модифицировать аргумент, передаваемый процессу каждый раз, ведь я получаю ключ только перед вызовом функции?
Я понял твою идею. Но проблема в том, что все возможные ключи могут проходить через все три потока, так что это не похоже на самый рабочий подход. Мне кажется, у вас должен быть один процесс, обрабатывающий входной поток. Более того, не должно быть необходимости разбивать словарь. Вместо этого у вас есть три процесса, которые обрабатывают треть ключей, как вы предполагали. Каждый процесс запускается в начале, и ему передается собственный экземпляр multiprocessing.Queue
в качестве входной очереди, и все они передаются в общую очередь результатов для возврата возвращаемого значения. Поток, запущенный основным процессом, постоянно попадает в очередь результатов и обновляет словарь возвращенными значениями.
Это общая идея:
from multiprocessing import Process, Queue
from threading import Thread
def update_a(input_queue, result_queue):
while True:
# Wait for next request:
x = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_b(input_queue, result_queue):
while True:
# Wait for next request:
y = input_queue.get()
if y is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def update_c(input_queue, result_queue):
while True:
# Wait for next request:
z = input_queue.get()
if x is None:
# This is a Sentinel indicating a request to terminate.
# Put sentinel to result queue to let the results_thread know
# that there are no more results coming from this process
result_queue.put(None)
return
# Process:
...
# Put result on the result queue:
result_queue.put(result)
def process_results():
sentinels_seen = 0
# Have all 3 processes finished?
while sentinels_seen < 3:
# Get next result
result = result_queue.get()
if result is None:
# Sentinel
sentinels_seen += 1
else:
# Update general_dict with result:
...
def process_input_stream():
while True:
# When we have decided that we are through processing input
# break out of the loop:
if through_processing:
break
# Get input from one of 3 sources and depending on key
# put the "argument" to either a_q, b_q or c_q to be handled respectively
# by either update_a, update_b or update_c.
# The result will be put to result queue which will be processed by our
# process_results thread.
...
# Add a sentinel to each of the input queues:
a_q.put(None)
b_q.put(None)
c_q.put(None)
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
t = Thread(target=process_results)
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
t.join()
Примечание:
Если вы обнаружите, что между потоком process_results
и циклом process_input_stream
слишком много конфликтов из-за GIL, который не позволяет последнему идти в ногу с входным потоком, не запускайте и не присоединяйтесь к process_results
потоку. Вместо этого просто запустите и соедините три процесса, как раньше, а затем, наконец, вызовprocess_results
как функцию основного процесса. Вы, конечно, потеряете любой параллелизм таким образом:
if __name__ == "__main__":
# Building the general_dict should be protected by if __name__ == "__main__":
general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
'B': {'a': [3, 4, 5], 'b': 'test2'},
'C': {'a': [6, 7, 8], 'b': 'test3'}}
a_q, b_q, c_q = Queue(), Queue(), Queue()
result_queue = Queue()
p1 = Process(target=update_a, args=(a_q, result_queue))
p2 = Process(target=update_b, args=(b_q, result_queue))
p3 = Process(target=update_c, args=(c_q, result_queue))
p1.start()
p2.start()
p3.start()
process_input_stream()
p1.join()
p2.join()
p3.join()
process_results()
Спасибо, это похоже на хороший подход. Я немного модифицировал его для тестирования с большим количеством данных и таймером, он закончился примерно вдвое быстрее, чем последовательная обработка. Это модифицированная версия, которую я использовал: pastebin.com/bTz7Kg97
сколько времени это занимает в настоящее время без многопроцессорности? несколько сотен ключей - это не так уж и много...