У меня есть разные классы Python, и все они имеют метод push. То, что я пытаюсь сделать, это
Я пробовал несколько способов, используя
#from pathos.multiprocessing import ProcessingPool as Pool
from multiprocessing import Pool
def push_wrapper(obj):
obj.push()
class_objects=[class1_obj1,class2_obj2,class3_obj3]
with Pool(processes=max_thread_count) as pool:
pool.map(push_wrapper, class_objects)
pool.close()
pool.join()
При этом я получаю эту ошибку
поднять TypeError(f"невозможно выбрать объект {self.class.name!r}")
Есть и другие подходы, например использование pool.apply_async
, но они не ждут немедленного завершения выхода из всех методов. Когда я добавляю job.wait()
вместе с pool.apply_async
, он ожидает завершения всех потоков, но я хочу напечатать результат потока по мере его завершения.
определенные объекты нелегко перенести в другой процесс (например, дескрипторы файлов, каналы, сокеты и т. д.), а содержание этих объектов в качестве атрибутов класса делает невозможным отправку экземпляра класса дочернему процессу для выполнения его метода. . Вы можете настроить сериализацию и десериализацию вашего класса, определив методы __getstate__
и __setstate__
, если вы лучше знаете, как реконструировать объект (т.е. сертификаты сертификатов можно реконструировать, а не копировать напрямую).
Как было объяснено в предыдущих комментариях, вы не можете передать какой-либо собственный класс в новые процессы. Таким образом, вам необходимо реализовать способы десериализации объекта и его повторной сериализации. Одним из решений, которое следует рассмотреть, будет создание json из класса, передача json в пул.map, а затем создание нового экземпляра класса из данного json.
Можно ссылку на пример? @ДжонниЧизекаттер
Также рассмотрите возможность использования модуля Pool из pathos
: from pathos.multiprocessing import ProcessingPool as Pool
Он использует dill
вместо pickle
и, таким образом, лучше справляется с травлением.
Кстати, включите соответствующие операторы import
в свой фрагмент кода, чтобы ваш пример действительно воспроизводился. (в данном случае импорт класса Pool
)
@JohnnyCheesecutter я пробовал это раньше, но безуспешно
@jsbueno Готово, обновлено
Вы хотите передать рабочие экземпляры своих классов, а затем вызвать метод push
каждого из них. Классы не подлежат сериализации (внутренне Python использует для этого стандартную библиотеку pickle
) и не могут передаваться в качестве аргументов рабочим процессам.
С другой стороны, экземпляры обычного класса могут передаваться (обратите внимание, что специальные классы, созданные расширениями в собственном коде, могут не иметь поддержки Pickle - но это можно реализовать - проверьте документацию протокола Pickle) - и тогда все рабочие нужно сделать, это вызвать метод push
для каждого.
TL;DR: вы должны заставить его работать, изменив оболочку и одну строку карты:
...
def push_wrapper(push_callable):
return push_callable()
...
with Pool(...) as pool:
pool.map(push_wrapper, (cls().push for cls in class_objects))
...
Обратите внимание, что выражение (cls().push for cls in class_objects)
является выражением-генератором, то есть итерацией, приемлемой для pool.map
, и для каждого элемента в последовательности вашего класса создается экземпляр (cls()
) и ссылается на его метод .push
как целевой вызываемый объект, который будет запущен в рабочем процессе. .
Подпроцесс (или любая подключаемая библиотека) будет сериализовать только что созданный экземпляр (если вам нужна ссылка на экземпляр в корневом процессе, измените код по своему усмотрению) вместе со ссылкой на его метод push
.
Спасибо. pool.map(push_wrapper, (cls.push() for cls in class_objects))
сработало, когда я использую cls().push , я получаю 'ClasName' object is not callable
. Но я заметил, что эти методы push не работают параллельно.
Я этого не понимаю. Если class_objects
содержит экземпляры классов, реализующих метод push
, то выражение вашего генератора генерирует результаты вызова push
для каждого экземпляра, и именно эти результаты передаются рабочей функции push_wrapper
, которая, вероятно, получает исключение. То есть фактический вызов push
выполняется основным процессом последовательно по мере того, как выражение-генератор генерирует свои результаты. В ОП также не указан минимальный воспроизводимый пример.
Мое выражение-генератор не выполняет вызов каждого метода push
. Так бы и было, если бы там было написано (cls().push() for cls in class_objects)
, но в приведенном выше коде нет ()
после push
- круглые скобки в Python являются «оператором», который запускает вызов метода (или функции, точнее, «вызываемого»). . Без них у меня есть ссылка на связанный метод.
@Booboo Но действительно, чтобы это работало с map
, мне нужен фиксированный, «выбираемый» вызываемый объект — я обновил ответ, включив в него такую сквозную оболочку, которая может работать с картой.
Мне нужны новые очки :-)
@Booboo - по совпадению, я только сегодня наткнулся на этот пост в новостной рассылке, на которую подписываюсь: asim.bearblog.dev/how-a-single-chatgpt-mistake-cost-us-10000 - угадайте, что это было ошибка? :-)
Ваш вопрос несколько расплывчатый. Если push_wrapper
ожидает передачи экземпляра класса, экземпляр которого был создан основным процессом (что, судя по коду, так и есть), но вы хотите, чтобы вызовы метода push
выполнялись параллельно, тогда:
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def push_wrapper(class_instance):
class_instance.push()
def main():
from multiprocessing import Pool
class_instances = [A(), B()]
with Pool() as pool:
pool.map(push_wrapper, class_instances)
if __name__ == '__main__':
main()
Распечатки:
A.__init__ called
B.__init__ called
A.push called
B.push called
Но это не соответствует приведенному вами описанию на английском языке, а именно:
Итак, если вы действительно хотите параллельно выполнить создание экземпляра класса с последующим вызовом push
(т. е. «все эти методы»), то вы можете передать своей рабочей функции имя класса следующим образом:
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def create_class_instance(class_name, *args, **kwargs):
the_class = globals()[class_name]
return the_class(*args, **kwargs)
def push_wrapper(class_name):
class_instance = create_class_instance(class_name)
class_instance.push()
def main():
from multiprocessing import Pool
class_names = ['A', 'B']
with Pool() as pool:
pool.map(push_wrapper, class_names)
if __name__ == '__main__':
main()
Распечатки:
A.__init__ called
A.push called
B.__init__ called
B.push called
Другой способ передачи типов классов
class A:
def __init__(self):
print('A.__init__ called')
def push(self):
print('A.push called')
class B:
def __init__(self):
print('B.__init__ called')
def push(self):
print('B.push called')
def create_class_instance(class_type, *args, **kwargs):
return class_type(*args, **kwargs)
# Or return class_type.__call__(*args, **kwargs)
def push_wrapper(class_type):
class_instance = create_class_instance(class_type)
class_instance.push()
def main():
from multiprocessing import Pool
class_types = [A, B]
with Pool() as pool:
pool.map(push_wrapper, class_types)
if __name__ == '__main__':
main()
Распечатки:
A.__init__ called
A.push called
B.__init__ called
B.push called
И если вы, возможно, указываете имя класса в другом пакете/модуле
Мы предполагаем, что каждый класс реализует метод append
:
class A:
def __init__(self, l=None):
self.l = l or []
def append(self, value):
self.l.append(value)
def __repr__(self):
return f'A({self.l})'
def create_class_instance(package_class_name, *args, **kwargs):
parts = package_class_name.split('.')
l = len(parts)
if l != 1:
package_name = '.'.join(parts[0:l-1])
class_name = parts[-1]
module = __import__(package_name, globals(), locals(), [class_name])
return module.__dict__[class_name](*args, *kwargs)
return globals()[package_class_name](*args, **kwargs)
def append_wrapper(package_class_name, *args, **kwargs):
class_instance = create_class_instance(package_class_name, *args, **kwargs)
class_instance.append(4)
print(class_instance)
def main():
from multiprocessing import Pool
class_names_and_args = [
('A', [1, 2, 3]),
('collections.deque', [1, 2, 3]),
('array.array', 'i', [1, 2, 3])
]
with Pool() as pool:
pool.starmap(append_wrapper, class_names_and_args)
if __name__ == '__main__':
main()
Распечатки:
A([1, 2, 3, 4])
deque([1, 2, 3, 4])
array('i', [1, 2, 3, 4])
Спасибо, это сработало. Можете ли вы объяснить, почему передача экземпляров классов в Pool.map не работает, тогда как передача имен классов в Pool.map работает?
Извините, я хотел показать еще один способ передачи типов классов (а не экземпляров). Я обновил ответ. В зависимости от того, есть ли у вас тип или имя класса, вы можете использовать тот или иной метод.
ошибка, которую мы получаем при запуске вашего кода, имеет вид
NameError: name 'class1_obj1' is not defined
, godbolt.org/z/EYsfa4EGW , напишите минимальный пример