Динамический аргумент для многопроцессорности

У меня следующая проблема: у меня есть словарь с несколькими сотнями ключей (все еще около 150 МБ), каждый со сложным значением, содержащим словари, списки, отдельные значения. У меня есть 3 потока входящей информации с таймингами 1 с, 0,1 с и в реальном времени, в зависимости от типа данных. Чтобы ускорить обработку данных, я хочу использовать многопроцессорную обработку, чтобы создать 3 процесса для разных источников, желательно, чтобы каждый процесс имел свой собственный пул для дальнейшего ускорения.

Проблема в том, как «нарезать» общий дикт на обновляемые части. Мне кажется, что используя пул или процесс, я должен определить список аргументов в начале, когда я инициализирую процесс/пул. Моей задаче потребуется что-то вроде этого: я получаю сообщение о том, что ключ «A» необходимо обновить. Я назначаю работника для его обновления, передавая сообщение, содержащее новую информацию и сложный объект "A" (или, по крайней мере, соответствующее значение "A"). Я определенно не хочу передавать весь дикт каждому рабочему, потому что он использует много памяти.

В этом примере кода я хотел бы передать general_dict['А']['а'] только при обработке первого элемента пример_данных_а, general_dict['B']['a'] для третьего и так далее. То же самое для example_data_b. Как мне передать аргументы?

general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                'B': {'a': [3, 4, 5], 'b': 'test2'},
                'C': {'a': [6, 7, 8], 'b': 'test3'}},

example_data_a = ['A', [2,1,2],
                  'A', [2,3,2],
                  'B', [3,0,5],
                  'C', [6,1,8]]

example_data_b = ['A', 'test11',
                  'B', 'test21',
                  'B', 'test22',
                  'C', 'test31']

def update_a(x):
    ...

def update_b(y):
    ...

if __name__ == "__main__":
    p1 = multiprocessing.Process(target = update_a)
    p2 = multiprocessing.Process(target = update_b)
    p1.start()
    p2.start()
    p1.join()
    p2.join()

сколько времени это занимает в настоящее время без многопроцессорности? несколько сотен ключей - это не так уж и много...

acushner 18.03.2022 00:09

Это зависит от данных, в крайних случаях я изменяю только первое значение списка или перебираю список из 1000 элементов 60-70 раз для одного обновления. В среднем размер моего буфера данных увеличивается на 5-10 000 за несколько минут, поэтому для одноядерной обработки это медленно.

balu 18.03.2022 07:46

Как в настоящее время обрабатываются три потока? Вы не показываете этого, и было бы полезно, если бы вы обрисовали это в общих чертах.

Booboo 18.03.2022 11:51

да, похоже, что эта структура данных не идеальна. возможно, предварительная обработка его один раз в более доступный решит проблему. из вашего вопроса немного неясно, как вы хотите сгруппировать свои данные. я думаю, если вы уточните, что в вашей голове код не будет сильно отставать.

acushner 18.03.2022 14:50

после организации ваших данных потом вы можете беспокоиться о добавлении к ним многопроцессорной обработки. (это довольно просто - проверьте ProcessPoolExecutor)

acushner 18.03.2022 14:52

@Booboo Три потока последовательно обрабатываются в заданных точках цикла while, где есть периоды ожидания и периоды управления. В периоды ожидания я обрабатываю все три потока пакетами. Затем по результатам контрольный период выдает команды. Это уже более 2000 строк кода, поэтому я хочу проиллюстрировать это на простом примере. Моя идея здесь заключается в том, что должен быть процесс, который обрабатывает поток1, один для потока2 и один для потока3, в то время как один процесс выполняет управление. Но для этого мне нужны только значения для данного ключа, у которого есть новая информация.

balu 18.03.2022 17:53

@acushner Это может случиться, я инженер, а не разработчик программного обеспечения. Я держу всю информацию по одному ключу, устройству, если хотите, в одном месте: списки параметров, статистику прошлых дней и матрицу актуальных исторических значений. Что требует наибольшего времени обработки, так это 2 столбца, различные (около 1000) строки, упорядоченная матрица параметров, где я должен добавлять, изменять или удалять строки. Было бы достаточно передать эту матрицу для функции обработки, но как мне модифицировать аргумент, передаваемый процессу каждый раз, ведь я получаю ключ только перед вызовом функции?

balu 18.03.2022 18:05
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
7
47
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я понял твою идею. Но проблема в том, что все возможные ключи могут проходить через все три потока, так что это не похоже на самый рабочий подход. Мне кажется, у вас должен быть один процесс, обрабатывающий входной поток. Более того, не должно быть необходимости разбивать словарь. Вместо этого у вас есть три процесса, которые обрабатывают треть ключей, как вы предполагали. Каждый процесс запускается в начале, и ему передается собственный экземпляр multiprocessing.Queue в качестве входной очереди, и все они передаются в общую очередь результатов для возврата возвращаемого значения. Поток, запущенный основным процессом, постоянно попадает в очередь результатов и обновляет словарь возвращенными значениями.

Это общая идея:

from multiprocessing import Process, Queue
from threading import Thread


def update_a(input_queue, result_queue):
    while True:
        # Wait for next request:
        x = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_b(input_queue, result_queue):
    while True:
        # Wait for next request:
        y = input_queue.get()
        if y is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)

def update_c(input_queue, result_queue):
    while True:
        # Wait for next request:
        z = input_queue.get()
        if x is None:
            # This is a Sentinel indicating a request to terminate.
            # Put sentinel to result queue to let the results_thread know
            # that there are no more results coming from this process
            result_queue.put(None)
            return
        # Process:
        ...
        # Put result on the result queue:
        result_queue.put(result)


def process_results():
    sentinels_seen = 0
    # Have all 3 processes finished?
    while sentinels_seen < 3:
        # Get next result
        result = result_queue.get()
        if result is None:
            # Sentinel
            sentinels_seen += 1
        else:
            # Update general_dict with result:
            ...

def process_input_stream():
    while True:
        # When we have decided that we are through processing input
        # break out of the loop:
        if through_processing:
            break
        # Get input from one of 3 sources and depending on key
        # put the "argument" to either a_q, b_q or c_q to be handled respectively
        # by either update_a, update_b or update_c.
        # The result will be put to result queue which will be processed by our
        # process_results thread.
        ...

    # Add a sentinel to each of the input queues:
    a_q.put(None)
    b_q.put(None)
    c_q.put(None)

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    t = Thread(target=process_results)
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()
    t.join()

Примечание:

Если вы обнаружите, что между потоком process_results и циклом process_input_stream слишком много конфликтов из-за GIL, который не позволяет последнему идти в ногу с входным потоком, не запускайте и не присоединяйтесь к process_results потоку. Вместо этого просто запустите и соедините три процесса, как раньше, а затем, наконец, вызовprocess_results как функцию основного процесса. Вы, конечно, потеряете любой параллелизм таким образом:

if __name__ == "__main__":
    # Building the general_dict should be protected by if __name__ == "__main__":
    general_dict = {'A': {'a': [0, 1, 2], 'b': 'test1'},
                    'B': {'a': [3, 4, 5], 'b': 'test2'},
                    'C': {'a': [6, 7, 8], 'b': 'test3'}}
    a_q, b_q, c_q =  Queue(), Queue(), Queue()
    result_queue = Queue()
    p1 = Process(target=update_a, args=(a_q, result_queue))
    p2 = Process(target=update_b, args=(b_q, result_queue))
    p3 = Process(target=update_c, args=(c_q, result_queue))
    p1.start()
    p2.start()
    p3.start()

    process_input_stream()

    p1.join()
    p2.join()
    p3.join()

    process_results()

Спасибо, это похоже на хороший подход. Я немного модифицировал его для тестирования с большим количеством данных и таймером, он закончился примерно вдвое быстрее, чем последовательная обработка. Это модифицированная версия, которую я использовал: pastebin.com/bTz7Kg97

balu 19.03.2022 12:45

Другие вопросы по теме