Вызов функций параллельного класса с использованием python joblib

С помощью joblib можно выполнять несколько вызовов функции в Python.

from joblib import Parallel, delayed 

def normal(x):
    print "Normal", x
    return x**2

if  __name__ == '__main__':

    results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
    print results

Дает: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]

Однако я действительно хочу вызвать функцию класса для списка экземпляров класса параллельно. Функция просто сохраняет переменную класса. Позже я обращусь к этой переменной.

from joblib import Parallel, delayed 

class A(object):
    def __init__(self, x):
        self.x = x
    def p(self):
        self.y = self.x**2

if  __name__ == '__main__':

    runs = [A(x) for x in range(20)]
    Parallel(n_jobs=4)(delayed(run.p() for run in runs))
    for run in runs:
        print run.y

Это дает ошибку:

Traceback (most recent call last):

File "", line 1, in runfile('G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py', wdir='G:/My Drive/CODE/stackoverflow/parallel_classfunc')

File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 710, in runfile execfile(filename, namespace)

File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 86, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)

File "G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py", line 12, in Parallel(n_jobs=4)(delayed(run.p() for run in runs))

File "C:\ProgramData\Anaconda2\lib\site-packages\joblib\parallel.py", line 183, in delayed pickle.dumps(function)

File "C:\ProgramData\Anaconda2\lib\copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.name

TypeError: can't pickle generator objects

Как можно использовать joblib с такими классами? Или есть лучший подход?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
3 691
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

How is it possible to use joblib with classes like this ?

Давайте сначала предложим немного доработки кода:

Не все вещи подходят для функций подписи вызовов joblib.Parallel()( delayed() ), чтобы проглотить:

# >>> type( runs )                        <type 'list'>
# >>> type( runs[0] )                     <class '__main__.A'>
# >>> type( run.p() for run in runs )     <type 'generator'>

Итак, заставим ДЕМО-объекты проходить «через» aContainerFUN():

StackOverflow_DEMO_joblib.Parallel.py:

from sklearn.externals.joblib import Parallel, delayed
import time

class A( object ):

    def __init__( self, x ):
        self.x = x
        self.y = "Defined on .__init__()"

    def p(        self ):
        self.y = self.x**2

def aNormalFUN( aValueOfX ):
    time.sleep( float( aValueOfX ) / 10. )
    print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
    return aValueOfX * aValueOfX

def aContainerFUN( aPayloadOBJECT ):
    time.sleep( float( aPayloadOBJECT.x ) / 10. )
    # try: except: finally:
    pass;  aPayloadOBJECT.p()
    print  "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
    time.sleep( 1 )

if __name__ == '__main__':
     # ------------------------------------------------------------------
     results = Parallel( n_jobs = 2
                         )(       delayed( aNormalFUN )( aParameterX )
                         for                             aParameterX in range( 11, 21 )
                         )
     print results
     print '.'
     # ------------------------------------------------------------------
     pass;       runs = [ A( x ) for x in range( 11, 21 ) ]
     # >>> type( runs )                        <type 'list'>
     # >>> type( runs[0] )                     <class '__main__.A'>
     # >>> type( run.p() for run in runs )     <type 'generator'>

     Parallel( verbose = 10,
               n_jobs  = 2
               )(        delayed( aContainerFUN )( run )
               for                                 run in runs
               )

Полученные результаты ? Работает как шарм!

C:\Python27.anaconda> python StackOverflow_DEMO_joblib.Parallel.py
: aNormalFUN() has got aValueOfX == 11 to process.
: aNormalFUN() has got aValueOfX == 12 to process.
: aNormalFUN() has got aValueOfX == 13 to process.
: aNormalFUN() has got aValueOfX == 14 to process.
: aNormalFUN() has got aValueOfX == 15 to process.
: aNormalFUN() has got aValueOfX == 16 to process.
: aNormalFUN() has got aValueOfX == 17 to process.
: aNormalFUN() has got aValueOfX == 18 to process.
: aNormalFUN() has got aValueOfX == 19 to process.
: aNormalFUN() has got aValueOfX == 20 to process.
[121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
.
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x ==  11 ]
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x ==  12 ]
[Parallel(n_jobs=2)]: Done   1 tasks      | elapsed:    2.4s
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x ==  13 ]
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x ==  14 ]
[Parallel(n_jobs=2)]: Done   4 tasks      | elapsed:    4.9s
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x ==  15 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x ==  16 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x ==  17 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x ==  18 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x ==  19 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x ==  20 ]
[Parallel(n_jobs=2)]: Done  10 out of  10 | elapsed:   13.3s finished

Спасибо за ваш ответ, но у меня это не сработало. Вы говорите, что сначала сгенерируйте список прогонов, а затем передайте их через обычную функцию, которая вызывает функцию класса. class A(object): def __init__(self, x): self.x = x self.y = "Defined on .__init__()" def p(self): self.y = self.x**2 print "Worked", self.x, self.y def worker(r): r.p() if __name__ == '__main__': runs = [A(x) for x in range(20)] Parallel(n_jobs=2)( delayed(worker)(run) for run in runs )

feedMe 06.06.2018 15:57

Конечно, ответ продемонстрировал полностью рабочий код, с исправленным синтаксисом для вызовов внутри Parallel(...)( delayed(...)...). Вызов класс-метод .p() был продемонстрирован как отложенный в соответствующих новых процессах, созданных фабрикой Parallel. Так работает joblib, при использовании внутреннего движка multiprocessing. Ваш пост не соответствует синтаксису сигнатуры вызова (см. Подробности трассировки выше), который, как было продемонстрировано выше, был отремонтирован с полностью работоспособным кодом остается доступным для повторяемой повторной проверки (й).

user3666197 06.06.2018 16:17

АГА! Да, я следовал вашему синтаксису и воспроизвел поведение вашего кода. Последнее, что нужно сделать - правильно обновить список экземпляров класса ("запусков")! aContainerFUN должен быть return aPayloadOBJECT, чтобы мы могли делать runs = Parallel(verbose=10, n_jobs=2)( delayed(worker)(run) for run in runs ). Тогда мы сможем получить доступ к runs[0].y, как хотелось бы в моем вопросе.

feedMe 06.06.2018 16:54

Конечно, следующим шагом будет измерение реальных реальных затрат на переход на параллельные пути. Чистая эффективность (выраженная как чистая реальная выгода / общая сумма всех расходы, включая все дополнительные накладные расходы - здесь проходы SER / DES при консервировании / распаковке + затраты на управление процессами, регулируемые многопроцессорной обработкой) решает. Не стесняйтесь читать более подробную информацию о скрытый накладные расходы и если, любые ускорения были фактически достигнуты (если и только если> 1.0) в эта статья >>> stackoverflow.com/revisions/18374629/3. Основной вопрос для тестирования + ответ на рефакторинг перед

user3666197 07.06.2018 11:58

Другие вопросы по теме