С помощью joblib можно выполнять несколько вызовов функции в Python.
from joblib import Parallel, delayed
def normal(x):
print "Normal", x
return x**2
if __name__ == '__main__':
results = Parallel(n_jobs=2)(delayed(normal)(x) for x in range(20))
print results
Дает: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Однако я действительно хочу вызвать функцию класса для списка экземпляров класса параллельно. Функция просто сохраняет переменную класса. Позже я обращусь к этой переменной.
from joblib import Parallel, delayed
class A(object):
def __init__(self, x):
self.x = x
def p(self):
self.y = self.x**2
if __name__ == '__main__':
runs = [A(x) for x in range(20)]
Parallel(n_jobs=4)(delayed(run.p() for run in runs))
for run in runs:
print run.y
Это дает ошибку:
Traceback (most recent call last):
File "", line 1, in runfile('G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py', wdir='G:/My Drive/CODE/stackoverflow/parallel_classfunc')
File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 710, in runfile execfile(filename, namespace)
File "C:\ProgramData\Anaconda2\lib\site-packages\spyder\utils\site\sitecustomize.py", line 86, in execfile exec(compile(scripttext, filename, 'exec'), glob, loc)
File "G:/My Drive/CODE/stackoverflow/parallel_classfunc/parallel_classfunc.py", line 12, in Parallel(n_jobs=4)(delayed(run.p() for run in runs))
File "C:\ProgramData\Anaconda2\lib\site-packages\joblib\parallel.py", line 183, in delayed pickle.dumps(function)
File "C:\ProgramData\Anaconda2\lib\copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.name
TypeError: can't pickle generator objects
Как можно использовать joblib с такими классами? Или есть лучший подход?






How is it possible to use
joblibwith classes like this ?
Давайте сначала предложим немного доработки кода:
Не все вещи подходят для функций подписи вызовов joblib.Parallel()( delayed() ), чтобы проглотить:
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
Итак, заставим ДЕМО-объекты проходить «через» aContainerFUN():
StackOverflow_DEMO_joblib.Parallel.py:
from sklearn.externals.joblib import Parallel, delayed
import time
class A( object ):
def __init__( self, x ):
self.x = x
self.y = "Defined on .__init__()"
def p( self ):
self.y = self.x**2
def aNormalFUN( aValueOfX ):
time.sleep( float( aValueOfX ) / 10. )
print ": aNormalFUN() has got aValueOfX == {0:} to process.".format( aValueOfX )
return aValueOfX * aValueOfX
def aContainerFUN( aPayloadOBJECT ):
time.sleep( float( aPayloadOBJECT.x ) / 10. )
# try: except: finally:
pass; aPayloadOBJECT.p()
print "| aContainerFUN: has got aPayloadOBJECT.id({0:}) to process. [ Has made .y == {1:}, given .x == {2: } ]".format( id( aPayloadOBJECT ), aPayloadOBJECT.y, aPayloadOBJECT.x )
time.sleep( 1 )
if __name__ == '__main__':
# ------------------------------------------------------------------
results = Parallel( n_jobs = 2
)( delayed( aNormalFUN )( aParameterX )
for aParameterX in range( 11, 21 )
)
print results
print '.'
# ------------------------------------------------------------------
pass; runs = [ A( x ) for x in range( 11, 21 ) ]
# >>> type( runs ) <type 'list'>
# >>> type( runs[0] ) <class '__main__.A'>
# >>> type( run.p() for run in runs ) <type 'generator'>
Parallel( verbose = 10,
n_jobs = 2
)( delayed( aContainerFUN )( run )
for run in runs
)
C:\Python27.anaconda> python StackOverflow_DEMO_joblib.Parallel.py
: aNormalFUN() has got aValueOfX == 11 to process.
: aNormalFUN() has got aValueOfX == 12 to process.
: aNormalFUN() has got aValueOfX == 13 to process.
: aNormalFUN() has got aValueOfX == 14 to process.
: aNormalFUN() has got aValueOfX == 15 to process.
: aNormalFUN() has got aValueOfX == 16 to process.
: aNormalFUN() has got aValueOfX == 17 to process.
: aNormalFUN() has got aValueOfX == 18 to process.
: aNormalFUN() has got aValueOfX == 19 to process.
: aNormalFUN() has got aValueOfX == 20 to process.
[121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
.
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 121, given .x == 11 ]
| aContainerFUN: has got aPayloadOBJECT.id(50369168) to process. [ Has made .y == 144, given .x == 12 ]
[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 2.4s
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 169, given .x == 13 ]
| aContainerFUN: has got aPayloadOBJECT.id(12896752) to process. [ Has made .y == 196, given .x == 14 ]
[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 4.9s
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 225, given .x == 15 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856464) to process. [ Has made .y == 256, given .x == 16 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 289, given .x == 17 ]
| aContainerFUN: has got aPayloadOBJECT.id(50368592) to process. [ Has made .y == 324, given .x == 18 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 361, given .x == 19 ]
| aContainerFUN: has got aPayloadOBJECT.id(12856528) to process. [ Has made .y == 400, given .x == 20 ]
[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 13.3s finished
Конечно, ответ продемонстрировал полностью рабочий код, с исправленным синтаксисом для вызовов внутри Parallel(...)( delayed(...)...). Вызов класс-метод .p() был продемонстрирован как отложенный в соответствующих новых процессах, созданных фабрикой Parallel. Так работает joblib, при использовании внутреннего движка multiprocessing. Ваш пост не соответствует синтаксису сигнатуры вызова (см. Подробности трассировки выше), который, как было продемонстрировано выше, был отремонтирован с полностью работоспособным кодом остается доступным для повторяемой повторной проверки (й).
АГА! Да, я следовал вашему синтаксису и воспроизвел поведение вашего кода. Последнее, что нужно сделать - правильно обновить список экземпляров класса ("запусков")! aContainerFUN должен быть return aPayloadOBJECT, чтобы мы могли делать runs = Parallel(verbose=10, n_jobs=2)( delayed(worker)(run) for run in runs ). Тогда мы сможем получить доступ к runs[0].y, как хотелось бы в моем вопросе.
Конечно, следующим шагом будет измерение реальных реальных затрат на переход на параллельные пути. Чистая эффективность (выраженная как чистая реальная выгода / общая сумма всех расходы, включая все дополнительные накладные расходы - здесь проходы SER / DES при консервировании / распаковке + затраты на управление процессами, регулируемые многопроцессорной обработкой) решает. Не стесняйтесь читать более подробную информацию о скрытый накладные расходы и если, любые ускорения были фактически достигнуты (если и только если> 1.0) в эта статья >>> stackoverflow.com/revisions/18374629/3. Основной вопрос для тестирования + ответ на рефакторинг перед
Спасибо за ваш ответ, но у меня это не сработало. Вы говорите, что сначала сгенерируйте список прогонов, а затем передайте их через обычную функцию, которая вызывает функцию класса.
class A(object): def __init__(self, x): self.x = x self.y = "Defined on .__init__()" def p(self): self.y = self.x**2 print "Worked", self.x, self.y def worker(r): r.p() if __name__ == '__main__': runs = [A(x) for x in range(20)] Parallel(n_jobs=2)( delayed(worker)(run) for run in runs )