Многопроцессорная обработка с несколькими объектами классов

У меня есть разные классы Python, и все они имеют метод push. То, что я пытаюсь сделать, это

  1. Создайте несколько объектов класса, по одному из каждого класса.
  2. Вызов метода push
  3. Все эти методы должны выполняться параллельно.
  4. Записывать сообщения, поступающие из метода push, по мере его завершения.
  5. Дождитесь завершения всех методов и выйдите из сценария.

Я пробовал несколько способов, используя

    #from pathos.multiprocessing import ProcessingPool as Pool
    from multiprocessing import Pool
    def push_wrapper(obj):
        obj.push()
    
    
    class_objects=[class1_obj1,class2_obj2,class3_obj3]
    with Pool(processes=max_thread_count) as pool:
        pool.map(push_wrapper, class_objects)
        pool.close()
        pool.join()

При этом я получаю эту ошибку

поднять TypeError(f"невозможно выбрать объект {self.class.name!r}")

Есть и другие подходы, например использование pool.apply_async, но они не ждут немедленного завершения выхода из всех методов. Когда я добавляю job.wait() вместе с pool.apply_async, он ожидает завершения всех потоков, но я хочу напечатать результат потока по мере его завершения.

ошибка, которую мы получаем при запуске вашего кода, имеет вид NameError: name 'class1_obj1' is not defined, godbolt.org/z/EYsfa4EGW , напишите минимальный пример

Ahmed AEK 06.06.2024 15:30

определенные объекты нелегко перенести в другой процесс (например, дескрипторы файлов, каналы, сокеты и т. д.), а содержание этих объектов в качестве атрибутов класса делает невозможным отправку экземпляра класса дочернему процессу для выполнения его метода. . Вы можете настроить сериализацию и десериализацию вашего класса, определив методы __getstate__ и __setstate__, если вы лучше знаете, как реконструировать объект (т.е. сертификаты сертификатов можно реконструировать, а не копировать напрямую).

Aaron 06.06.2024 15:38

Как было объяснено в предыдущих комментариях, вы не можете передать какой-либо собственный класс в новые процессы. Таким образом, вам необходимо реализовать способы десериализации объекта и его повторной сериализации. Одним из решений, которое следует рассмотреть, будет создание json из класса, передача json в пул.map, а затем создание нового экземпляра класса из данного json.

Johnny Cheesecutter 06.06.2024 15:56

Можно ссылку на пример? @ДжонниЧизекаттер

Sandeep Lade 06.06.2024 15:59

Также рассмотрите возможность использования модуля Pool из pathos: from pathos.multiprocessing import ProcessingPool as Pool Он использует dill вместо pickle и, таким образом, лучше справляется с травлением.

Johnny Cheesecutter 06.06.2024 17:03

Кстати, включите соответствующие операторы import в свой фрагмент кода, чтобы ваш пример действительно воспроизводился. (в данном случае импорт класса Pool)

jsbueno 06.06.2024 17:45

@JohnnyCheesecutter я пробовал это раньше, но безуспешно

Sandeep Lade 06.06.2024 17:57

@jsbueno Готово, обновлено

Sandeep Lade 06.06.2024 17:59
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
8
91
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы хотите передать рабочие экземпляры своих классов, а затем вызвать метод push каждого из них. Классы не подлежат сериализации (внутренне Python использует для этого стандартную библиотеку pickle) и не могут передаваться в качестве аргументов рабочим процессам.

С другой стороны, экземпляры обычного класса могут передаваться (обратите внимание, что специальные классы, созданные расширениями в собственном коде, могут не иметь поддержки Pickle - но это можно реализовать - проверьте документацию протокола Pickle) - и тогда все рабочие нужно сделать, это вызвать метод push для каждого.

TL;DR: вы должны заставить его работать, изменив оболочку и одну строку карты:

...
def push_wrapper(push_callable):
    return push_callable()
...
with Pool(...) as pool:
    pool.map(push_wrapper, (cls().push for cls in class_objects))
...

Обратите внимание, что выражение (cls().push for cls in class_objects) является выражением-генератором, то есть итерацией, приемлемой для pool.map, и для каждого элемента в последовательности вашего класса создается экземпляр (cls()) и ссылается на его метод .push как целевой вызываемый объект, который будет запущен в рабочем процессе. .

Подпроцесс (или любая подключаемая библиотека) будет сериализовать только что созданный экземпляр (если вам нужна ссылка на экземпляр в корневом процессе, измените код по своему усмотрению) вместе со ссылкой на его метод push.

Спасибо. pool.map(push_wrapper, (cls.push() for cls in class_objects)) сработало, когда я использую cls().push , я получаю 'ClasName' object is not callable. Но я заметил, что эти методы push не работают параллельно.

Sandeep Lade 07.06.2024 09:42

Я этого не понимаю. Если class_objects содержит экземпляры классов, реализующих метод push, то выражение вашего генератора генерирует результаты вызова push для каждого экземпляра, и именно эти результаты передаются рабочей функции push_wrapper, которая, вероятно, получает исключение. То есть фактический вызов push выполняется основным процессом последовательно по мере того, как выражение-генератор генерирует свои результаты. В ОП также не указан минимальный воспроизводимый пример.

Booboo 07.06.2024 22:44

Мое выражение-генератор не выполняет вызов каждого метода push. Так бы и было, если бы там было написано (cls().push() for cls in class_objects) , но в приведенном выше коде нет () после push - круглые скобки в Python являются «оператором», который запускает вызов метода (или функции, точнее, «вызываемого»). . Без них у меня есть ссылка на связанный метод.

jsbueno 08.06.2024 21:33

@Booboo Но действительно, чтобы это работало с map, мне нужен фиксированный, «выбираемый» вызываемый объект — я обновил ответ, включив в него такую ​​сквозную оболочку, которая может работать с картой.

jsbueno 08.06.2024 22:04

Мне нужны новые очки :-)

Booboo 08.06.2024 22:31

@Booboo - по совпадению, я только сегодня наткнулся на этот пост в новостной рассылке, на которую подписываюсь: asim.bearblog.dev/how-a-single-chatgpt-mistake-cost-us-10000 - угадайте, что это было ошибка? :-)

jsbueno 10.06.2024 17:44
Ответ принят как подходящий

Ваш вопрос несколько расплывчатый. Если push_wrapper ожидает передачи экземпляра класса, экземпляр которого был создан основным процессом (что, судя по коду, так и есть), но вы хотите, чтобы вызовы метода push выполнялись параллельно, тогда:

class A:
    def __init__(self):
        print('A.__init__ called')

    def push(self):
        print('A.push called')

class B:
    def __init__(self):
        print('B.__init__ called')

    def push(self):
        print('B.push called')

def push_wrapper(class_instance):
    class_instance.push()

def main():
    from multiprocessing import Pool

    class_instances = [A(), B()]

    with Pool() as pool:
        pool.map(push_wrapper, class_instances)

if __name__ == '__main__':
    main()

Распечатки:

A.__init__ called
B.__init__ called
A.push called
B.push called

Но это не соответствует приведенному вами описанию на английском языке, а именно:

  1. Все эти методы должны выполняться параллельно.

Итак, если вы действительно хотите параллельно выполнить создание экземпляра класса с последующим вызовом push (т. е. «все эти методы»), то вы можете передать своей рабочей функции имя класса следующим образом:

class A:
    def __init__(self):
        print('A.__init__ called')

    def push(self):
        print('A.push called')

class B:
    def __init__(self):
        print('B.__init__ called')

    def push(self):
        print('B.push called')


def create_class_instance(class_name, *args, **kwargs):
    the_class = globals()[class_name]
    return the_class(*args, **kwargs)

def push_wrapper(class_name):
    class_instance = create_class_instance(class_name)
    class_instance.push()

def main():
    from multiprocessing import Pool

    class_names = ['A', 'B']

    with Pool() as pool:
        pool.map(push_wrapper, class_names)

if __name__ == '__main__':
    main()

Распечатки:

A.__init__ called
A.push called
B.__init__ called
B.push called

Другой способ передачи типов классов

class A:
    def __init__(self):
        print('A.__init__ called')

    def push(self):
        print('A.push called')

class B:
    def __init__(self):
        print('B.__init__ called')

    def push(self):
        print('B.push called')


def create_class_instance(class_type, *args, **kwargs):
    return class_type(*args, **kwargs)
    # Or return class_type.__call__(*args, **kwargs)

def push_wrapper(class_type):
    class_instance = create_class_instance(class_type)
    class_instance.push()

def main():
    from multiprocessing import Pool

    class_types = [A, B]

    with Pool() as pool:
        pool.map(push_wrapper, class_types)

if __name__ == '__main__':
    main()

Распечатки:

A.__init__ called
A.push called
B.__init__ called
B.push called

И если вы, возможно, указываете имя класса в другом пакете/модуле

Мы предполагаем, что каждый класс реализует метод append:

class A:
    def __init__(self, l=None):
        self.l = l or []

    def append(self, value):
        self.l.append(value)


    def __repr__(self):
        return f'A({self.l})'

def create_class_instance(package_class_name, *args, **kwargs):
    parts = package_class_name.split('.')
    l = len(parts)
    if l != 1:
        package_name = '.'.join(parts[0:l-1])
        class_name = parts[-1]
        module = __import__(package_name, globals(), locals(), [class_name])
        return module.__dict__[class_name](*args, *kwargs)
    return globals()[package_class_name](*args, **kwargs)

def append_wrapper(package_class_name, *args, **kwargs):
    class_instance = create_class_instance(package_class_name, *args, **kwargs)
    class_instance.append(4)
    print(class_instance)

def main():
    from multiprocessing import Pool

    class_names_and_args = [
        ('A', [1, 2, 3]),
        ('collections.deque', [1, 2, 3]),
        ('array.array', 'i', [1, 2, 3])
    ]

    with Pool() as pool:
        pool.starmap(append_wrapper, class_names_and_args)

if __name__ == '__main__':
    main()

Распечатки:

A([1, 2, 3, 4])
deque([1, 2, 3, 4])
array('i', [1, 2, 3, 4])

Спасибо, это сработало. Можете ли вы объяснить, почему передача экземпляров классов в Pool.map не работает, тогда как передача имен классов в Pool.map работает?

Sandeep Lade 11.06.2024 09:07

Извините, я хотел показать еще один способ передачи типов классов (а не экземпляров). Я обновил ответ. В зависимости от того, есть ли у вас тип или имя класса, вы можете использовать тот или иной метод.

Booboo 11.06.2024 11:48

Другие вопросы по теме