Я использую Python 3.9.1
Примечание. Я знаю, что есть вопросы с похожими названиями. Но эти вопросы встроены в сложный код, который затрудняет понимание проблемы. Это голая реализация проблемы, которую, я думаю, другим будет легче переварить.
Обновлено: у меня есть Pool(processes=64)
в моем коде. Но большинству других, вероятно, придется изменить это в зависимости от того, сколько ядер есть на их компьютере. И если это займет слишком много времени, измените listLen
на меньшее число
Я пытаюсь узнать о многопроцессорности, чтобы решить проблему на работе. У меня есть список массивов, с которыми мне нужно сделать попарное сравнение массивов. Но для простоты я воссоздал суть проблемы с простыми целыми числами вместо массивов и функцией сложения вместо вызова какой-то сложной функции сравнения. С приведенным ниже кодом я сталкиваюсь с титульной ошибкой
import time
from multiprocessing import Pool
import itertools
import random
def add_nums(a, b):
return(a + b)
if __name__ == "__main__":
listLen = 1000
# Create a list of random numbers to do pairwise additions of
myList = [random.choice(range(1000)) for i in range(listLen)]
# Create a list of all pairwise combinations of the indices of the list
index_combns = [*itertools.combinations(range(len(myList)),2)]
# Do the pairwise operation without multiprocessing
start_time = time.time()
sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with no MP")
# Do the pairwise operations with multiprocessing
start_time = time.time()
pool = Pool(processes=64)
sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with MP")
pool.close()
pool.join()
Python не может обрабатывать лямбда-функции. Вместо этого вы должны определить функцию и вместо этого передать имя функции. Вот как вы можете подойти к этому:
import itertools
import random
import time
from multiprocessing import Pool
def add_nums(a, b):
return a + b
def foo(x):
return add_nums(x[0], x[1])
if __name__ == "__main__":
listLen = 1000
# Create a list of random numbers to do pairwise additions of
myList = [random.choice(range(1000)) for i in range(listLen)]
# Create a list of all pairwise combinations of the indices of the list
index_combns = [
(myList[i[0]], myList[i[1]])
for i in itertools.combinations(range(len(myList)), 2)
]
# Do the pairwise operation without multiprocessing
start_time = time.time()
sums_no_mp = [*map(foo, index_combns)]
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with no MP")
# Do the pairwise operations with multiprocessing
start_time = time.time()
pool = Pool(processes=64)
sums_mp = pool.map(foo, index_combns)
end_time = time.time() - start_time
print(f"Process took {end_time} seconds with MP")
pool.close()
pool.join()
Я изменил index_combns
, чтобы также извлечь значение из myList
на месте, потому что myList
не будет доступен из foo
, а передача нескольких копий myList
увеличит сложность вашего скрипта.
Запуск этой печати:
Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP
При всем уважении, возможно, вы захотите исправить свое утверждение, что (цит.) "Python не может рассолить..." — это неверное утверждение. Python не может распарывать лямбда-выражения, если используется pickle-module. Python умеет мариновать лямбды при использовании dill-модуля (уже лет 10).
Я не совсем уверен, что Зачем (хотя тщательное чтение многопроцессорные документы, вероятно, дало бы ответ), но в многопроцессорной обработке python задействован процесс травления, когда дочерним процессам передаются определенные вещи. Хотя я ожидал, что лямбды будут унаследованы, а не переданы через рассола, я думаю, что это не то, что происходит.
После обсуждения в комментариях рассмотрим примерно такой подход:
import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory
def add_mats(a, b):
#time.sleep(0.00001)
return (a + b)
# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
shm = shared_memory.SharedMemory(name=shm_name)
stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
a = stacked[i1]
b = stacked[i2]
result = add_mats(a, b)
shm.close()
return result
if __name__ == "__main__":
class Timer:
def __init__(self):
self.start = None
self.stop = None
self.delta = None
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, *exc_args):
self.stop = time.time()
self.delta = self.stop - self.start
arrays = [np.random.rand(5,5) for _ in range(50)]
index_combns = list(itertools.combinations(range(len(arrays)),2))
# Helper for non-mp version
def add_mats_pair(ij_pair):
i, j = ij_pair
a = arrays[i]
b = arrays[j]
return add_mats(a, b)
with Timer() as t:
# Do the pairwise operation without multiprocessing
sums_no_mp = list(map(add_mats_pair, index_combns))
print(f"Process took {t.delta} seconds with no MP")
with Timer() as t:
# Stack arrays and copy result into shared memory
stacked = np.stack(arrays)
shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
shm_arr[:] = stacked[:]
with Pool(processes=32) as pool:
processes = [pool.apply_async(add_mats_shared, (
shm.name,
stacked.shape,
stacked.dtype,
i,
j,
)) for (i,j) in index_combns]
sums_mp = [p.get() for p in processes]
shm.close()
shm.unlink()
print(f"Process took {t.delta} seconds with MP")
for i in range(len(sums_no_mp)):
assert (sums_no_mp[i] == sums_mp[i]).all()
print("Results match.")
Он использует multiprocessing.shared_memory для совместного использования одного массива размерностей numpy (Н+1) (вместо списка массивов размерностей Н) между хост-процессом и дочерними процессами.
Другие вещи, которые отличаются, но не имеют значения:
Pool
используется в качестве диспетчера контекста, чтобы избежать явного закрытия и присоединения к нему.Timer
— это простой контекстный менеджер для временных блоков кода.pool.map
заменен на звонки pool.apply_async
pool.map
тоже подойдет, но вам нужно создать список аргументов перед вызовом .map
и распаковать его в рабочей функции, например:
with Pool(processes=32) as pool:
args = [(
shm.name,
stacked.shape,
stacked.dtype,
i,
j,
) for (i,j) in index_combns]
sums_mp = pool.map(add_mats_shared, args)
# and
# Helper for mp version
def add_mats_shared(args):
shm_name, array_shape, array_dtype, i1, i2 = args
shm = shared_memory.SharedMemory(name=shm_name)
....
Спасибо за помощь. Хотя я пытаюсь вызвать функцию add_nums
для элементов myList
, а не для индексов, должен ли я передавать всю myList
в функцию при каждом вызове? Не вызовет ли это каких-либо проблем с производительностью?
@The_Questioner О, вы могли бы просто вызывать комбинации в myList напрямую, а не индексы. Я обновлю.
Спасибо еще раз. К сожалению, это невозможно для моей реальной задачи (хотя это будет работать для этой упрощенной версии), потому что в моей реальной задаче вместо простых целых чисел myList
представляет собой список больших массивов. Таким образом, создание таких комбинаций приведет к созданию тысяч ненужных копий этих больших массивов.
Справедливо, редактирую...
Хм, я так и не разобрался. Мне было интересно, сможете ли вы заставить его работать, не проходя полностью myList
снова и снова.
Извините, я никогда не обновлял его. Я думаю, вам понадобится что-то вроде Менеджер или Общая память. Чтобы эффективно обеспечивать myList
дочерние процессы. myList
- это список пустых массивов?
Да, это пустые массивы длиной ~ 70 000. Извините за поздний ответ.
@The_Questioner обновлен с использованием подхода shared_memory
Благодарю вас! Я читаю об общей памяти, но, похоже, это то, что мне нужно, чтобы обойти мою проблему. По какой-то причине MP-версия намного медленнее, чем не-MP, но я попытаюсь это выяснить. Моим главным препятствием была ошибка рассола, и вы очень помогли с этим. Спасибо!
@The_Questioner «По какой-то причине MP-версия намного медленнее, чем не-MP, но я попытаюсь это выяснить». Попробуйте раскомментировать строку time.sleep
внутри функции add_mats
(эмулируя более дорогой обратный вызов worker), и вы должны увидеть изменение производительности, как и ожидали.
@jedwards - ваше предложение игнорирует все дополнительные расходы, которые убивают производительность, когда масштабы увеличиваются (здесь это обычное явление) - (а) операции со списками «хороши» в школьных учебниках, как SLOC, но убийцы в больших Оперативная память (как в данном случае), тем хуже, если внутри «хороших» итераторов (б) затраты на добавление интерпретатора Python для создания экземпляра процесса являются огромными блокировщиками — посмотрите на влияние закона Амдала, если мы начнем не игнорировать дополнительные затраты (инструкции никогда не присутствуют в чистом [SERIAL] базовом запуске) stackoverflow.com/revisions/18374629/3 (c) последнее, но не менее важное, обработанный блок numpy ...
@user3666197 user3666197 У меня действительно нет предложения. ОП представил пример проблемы, когда они хотели поделиться большим набором массивов без многократного копирования каждого, и это решает эту проблему. Поскольку это примерная проблема, у меня нет возможности узнать, является ли MP правильным маршрутом или нет, но я предполагаю, что они смогут выяснить это довольно быстро, как только заработают. С точки зрения (с) неясно, что вы имеете в виду. Если вы говорите о GIL и потоках, то это процессы. Если вы говорите о пустых библиотеках, таких как MKL или BLAS, это выходит за рамки этого вопроса, imo.
Согласитесь, что O/P заявил о желании учиться. Тем не менее, лямбда-маринование является маргинальным блокировщиком в этом, совместное использование является убийцей производительности как внутри экосистемы интерпретатора Python (на основе потоков, из-за блокировки GIL), так и в экзосистеме (на основе процессов, из-за снижения производительности). "-эмуляция накладных расходов конвейера SER/xfer/DES между процессами, чем больше блокирует независимость, процессы были порождены для получения, не так ли? ) в (c) опечатка /извините/ аргумент был в том, что чем больше процессов занимают ядра ЦП, тем меньше ядер остается свободным для собственных многоядерных запусков уровня HPC.
ИМХО, конфликтующие анти-паттерны здесь не выходят за рамки, так как в результате производительность только пострадает. Наличие явного оператора O/P о том, что код использует большой-список большой-numpy-массивов, является мигающим предупреждением о красном свете, это никогда не будет работать быстро в поэтапном P2P-распределении последовательного итератора исчисления парных списков-членов. Можно не говорить об этом О/П, но в таком случае мы не дали здесь честного совета, не так ли? Я не пытаюсь умолять вступать в наномасштабные споры о (не)преимуществах MKL против BLAS, но оставлять O/P внутри гигантской дыры расширяющегося анти-паттерна?
Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "
А:
единственный самый важный опыт, который нужно изучить
, это то, насколько БОЛЬШИМ являются РАСХОДЫ-из-(процесса)-ИНСТАНЦИЯ(s),
все остальные дополнительные накладные расходы
(но ни в коем случае не незначительные, тем более в увеличение масштабов проблемы )
детали по сравнению с этой огромной и главной.
Before the answer is read-through and completely understood, here is a Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing
-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story about a pool of sharks is a good place to start from )
а) травление лямбд ( and many other SER/DES blockers )
легко — достаточно conda install dill
и import dill as pickle
как умеет укроп, годами травить их — кредит @MikeMcKearns и вашему коду не нужно рефакторить использование простого интерфейса pickle.dumps()
-вызова. Таким образом, использование pathos.multiprocess
по умолчанию используется для внутреннего использования dill
, и этой уже давно известной слабости multiprocessing
SER/DES удается избежать.
б) убийцы производительности
- multiprocessing.Pool.map()
это скорее End-to-End анти-шаблон производительности здесь - Расходы..., если мы начнем не пренебрегать ими, покажите, сколько CPU-тактов и заблокированных физических-RAM-вводов/выводов платится за столько процессов- экземпляров ( 60+ ), которые, наконец, «занимают» почти все физические ядра ЦП, но оставляют, таким образом, почти нулевое пространство для действительно высокой производительности numpy
-родных многоядерных вычислений основной проблемы (для которой ожидалось, что максимальная производительность будет форсирован, не так ли? )
- просто переместите ползунок п в симуляторе на что-то меньшее, чем 100% (без [SERIAL]
-части выполнения задачи, что хорошо в теории, но никогда не выполнимо на практике, даже запуск программы является чистым [SERIAL]
, по замыслу )
- просто переместите ползунок Оverhead в симуляторе на что-нибудь выше простого нуля (выражая относительную дополнительную стоимость создания одного из процессов НCPUcores, как число процентов, по отношению к такому номеру части [PARALLEL]
-раздела инструкций - математически "плотная" работа имеет много таких "полезных"-инструкций и может , предполагая, что никакие другие убийцы производительности не выпрыгивают из коробки, могут потратить некоторую разумную сумму «дополнительных» затрат, чтобы породить некоторое количество одновременных или параллельных операций (фактическое число зависит только от фактического экономия затрат, а не от того, сколько CPU-ядра присутствуют, тем более по нашим "пожеланиям" или схоластическим или еще хуже копипаст-"советам" ). Наоборот, математически «мелкая» работа почти всегда имеет «ускорение» << 1 (замедление огромный), так как почти нет шансов оправдать известные дополнительные затраты (оплата на процесс-инстанциациях, данные SER/xfer /DES перемещается внутрь (параметры) и назад (результаты)
- далее переместите ползунок Оverhead в симуляторе к самому правому краю == 1
. Это показывает случай, когда фактические накладные расходы (потеря времени), порождающие процесс, по-прежнему не превышают <= 1 %
всех последующих инструкций, связанных с вычислениями, которые будут выполняться для «полезной» части. работы, которая будет вычисляться внутри такого порожденного экземпляра процесса. Таким образом, даже такой коэффициент пропорции 1:100 (выполнение в 100 раз больше «полезной» работы, чем потерянное процессорное время, сожженное для организации такого количества копий и заставляющего O/S-планировщика организовывать их параллельное выполнение внутри доступной системной виртуальной памяти) имеет уже все предупреждения видны на графике прогрессии Speedup-деградации - просто поиграйте немного с Оverhead-слайдером в симуляторе, прежде чем трогать остальные...
- избежать греха «совместного использования» (если целью является производительность) - опять же, затраты на эксплуатацию такой оркестровки между несколькими процессами интерпретатора Python, теперь независимыми, влекут за собой дополнительные дополнительные затраты, которые никогда не оправдываются повышением производительности, поскольку борьба за занимая общие ресурсы (ядра ЦП, каналы физической ОЗУ-ввода-вывода) только опустошает коэффициенты повторного использования кэш-памяти процессора, переключатели контекста процесса, управляемые планировщиком O/S, и все это еще больше снижает итоговую -Конец производительности (чего мы не хотим, не так ли?)
в) повышение работоспособности
- уважайте факты о фактические затраты любой вычислительной операции
- избегайте "поверхностных" шагов вычислений,
- максимизируйте то, что так дорого обходится в набор распределенных-процессов (если такая необходимость остается),
- избегайте все операции добавления служебных данных (например, добавление локальных временных переменных, где встроенные операции позволяют вместо хранения частичных результатов)
и
- попробуйте перейти к использованию сверхпроизводительного, дружественного к кэшу и оптимизированного, родного numpy
-векторизованного многоядерного и пошагового возможности трюков, не блокируемые предварительно перегруженными ядрами ЦП за счет планирования такого количества (~ 60) копий процесса Python Interpreter, каждая из которых пытается вызвать numpy
-код, таким образом, не имея свободных ядер для фактического размещения такого высокопроизводительного кэша. -удобные для повторного использования векторные вычисления (здесь мы получаем или теряем большую часть производительности, а не в медленных последовательных итераторах, не в порождении 60+ основанных на процессах полных копий «__main__
» интерпретатора Python, прежде чем выполнять один кусок полезной работы на наших больших данных, дорого РА M-распределено и физически скопировано более 60 раз)
- рефакторинг реальной проблемы никогда не должен противоречить собранному знания о производительности, так как повторение того, что не работает, не принесет никаких преимуществ, не так ли?
- уважайте ограничения вашей физической платформы , игнорирование их ухудшит вашу производительность
- тест, профиль, рефакторинг
- тест, профиль, рефакторинг
- тест, профиль, рефакторинг
других волшебных палочек здесь нет
и как только вы уже работаете на переднем крае производительности, установите gc.disable()
прежде чем вы создадите интерпретатор Python в N-множестве реплицированных копий, чтобы не ждать спонтанной сборки мусора при достижении максимальной производительности
Спасибо за ответ. Признаться, я не совсем понимаю все, что здесь написано. Но мне потребуется некоторое время, чтобы пройти через это, и я уверен, что это будет огромной помощью, когда я это сделаю :)
Благодарю за ваш ответ. Хотя это будет работать для этой упрощенной версии задачи, на самом деле элементы
myList
не являются целыми числами, а представляют собой большие массивы, поэтому создание парmyList[i[0]], myList[i[1]]
для всех комбинаций индексов списка будет хранить сотни копий этих большие массивы без надобности. Поэтому я не могу создатьindex_combns
таким образом. Я надеюсь, что смогу донести свою проблему.