Python: _pickle.PicklingError: невозможно рассолить <функция <лямбда>>

Я использую Python 3.9.1

Примечание. Я знаю, что есть вопросы с похожими названиями. Но эти вопросы встроены в сложный код, который затрудняет понимание проблемы. Это голая реализация проблемы, которую, я думаю, другим будет легче переварить.

Обновлено: у меня есть Pool(processes=64) в моем коде. Но большинству других, вероятно, придется изменить это в зависимости от того, сколько ядер есть на их компьютере. И если это займет слишком много времени, измените listLen на меньшее число

Я пытаюсь узнать о многопроцессорности, чтобы решить проблему на работе. У меня есть список массивов, с которыми мне нужно сделать попарное сравнение массивов. Но для простоты я воссоздал суть проблемы с простыми целыми числами вместо массивов и функцией сложения вместо вызова какой-то сложной функции сравнения. С приведенным ниже кодом я сталкиваюсь с титульной ошибкой

import time
from multiprocessing import Pool
import itertools
import random

def add_nums(a, b):
    return(a + b)

if __name__ == "__main__":
    listLen = 1000
    
    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [*itertools.combinations(range(len(myList)),2)]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(lambda x: add_nums(myList[x[0]], myList[x[1]]), index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
81
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Python не может обрабатывать лямбда-функции. Вместо этого вы должны определить функцию и вместо этого передать имя функции. Вот как вы можете подойти к этому:

import itertools
import random
import time
from multiprocessing import Pool


def add_nums(a, b):
    return a + b


def foo(x):
    return add_nums(x[0], x[1])


if __name__ == "__main__":
    listLen = 1000

    # Create a list of random numbers to do pairwise additions of
    myList = [random.choice(range(1000)) for i in range(listLen)]
    # Create a list of all pairwise combinations of the indices of the list
    index_combns = [
        (myList[i[0]], myList[i[1]])
        for i in itertools.combinations(range(len(myList)), 2)
    ]

    # Do the pairwise operation without multiprocessing
    start_time = time.time()
    sums_no_mp = [*map(foo, index_combns)]
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with no MP")

    # Do the pairwise operations with multiprocessing
    start_time = time.time()
    pool = Pool(processes=64)
    sums_mp = pool.map(foo, index_combns)
    end_time = time.time() - start_time
    print(f"Process took {end_time} seconds with MP")

    pool.close()
    pool.join()

Я изменил index_combns, чтобы также извлечь значение из myList на месте, потому что myList не будет доступен из foo, а передача нескольких копий myList увеличит сложность вашего скрипта.

Запуск этой печати:

Process took 0.053926944732666016 seconds with no MP
Process took 0.4799039363861084 seconds with MP

Благодарю за ваш ответ. Хотя это будет работать для этой упрощенной версии задачи, на самом деле элементы myList не являются целыми числами, а представляют собой большие массивы, поэтому создание пар myList[i[0]], myList[i[1]] для всех комбинаций индексов списка будет хранить сотни копий этих большие массивы без надобности. Поэтому я не могу создать index_combns таким образом. Я надеюсь, что смогу донести свою проблему.

The_Questioner 21.03.2022 20:00

При всем уважении, возможно, вы захотите исправить свое утверждение, что (цит.) "Python не может рассолить..." — это неверное утверждение. Python не может распарывать лямбда-выражения, если используется pickle-module. Python умеет мариновать лямбды при использовании dill-модуля (уже лет 10).

user3666197 22.03.2022 01:19
Ответ принят как подходящий

Я не совсем уверен, что Зачем (хотя тщательное чтение многопроцессорные документы, вероятно, дало бы ответ), но в многопроцессорной обработке python задействован процесс травления, когда дочерним процессам передаются определенные вещи. Хотя я ожидал, что лямбды будут унаследованы, а не переданы через рассола, я думаю, что это не то, что происходит.

После обсуждения в комментариях рассмотрим примерно такой подход:

import time
from multiprocessing import Pool
import itertools
import numpy as np
from multiprocessing import shared_memory

def add_mats(a, b):
    #time.sleep(0.00001)
    return (a + b)

# Helper for mp version
def add_mats_shared(shm_name, array_shape, array_dtype, i1, i2):
    shm = shared_memory.SharedMemory(name=shm_name)
    stacked = np.ndarray(array_shape, dtype=array_dtype, buffer=shm.buf)
    a = stacked[i1]
    b = stacked[i2]
    result = add_mats(a, b)
    shm.close()
    return result

if __name__ == "__main__":
    class Timer:
        def __init__(self):
            self.start = None
            self.stop  = None
            self.delta = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *exc_args):
            self.stop = time.time()
            self.delta = self.stop - self.start

    arrays = [np.random.rand(5,5) for _ in range(50)]
    index_combns = list(itertools.combinations(range(len(arrays)),2))

    # Helper for non-mp version
    def add_mats_pair(ij_pair):
        i, j = ij_pair
        a = arrays[i]
        b = arrays[j]
        return add_mats(a, b)

    with Timer() as t:
        # Do the pairwise operation without multiprocessing
        sums_no_mp = list(map(add_mats_pair, index_combns))

    print(f"Process took {t.delta} seconds with no MP")


    with Timer() as t:
        # Stack arrays and copy result into shared memory
        stacked = np.stack(arrays)
        shm = shared_memory.SharedMemory(create=True, size=stacked.nbytes)
        shm_arr = np.ndarray(stacked.shape, dtype=stacked.dtype, buffer=shm.buf)
        shm_arr[:] = stacked[:]

        with Pool(processes=32) as pool:
            processes = [pool.apply_async(add_mats_shared, (
                shm.name,
                stacked.shape,
                stacked.dtype,
                i,
                j,
            )) for (i,j) in index_combns]
            sums_mp = [p.get() for p in processes]

        shm.close()
        shm.unlink()

    print(f"Process took {t.delta} seconds with MP")

    for i in range(len(sums_no_mp)):
        assert (sums_no_mp[i] == sums_mp[i]).all()

    print("Results match.")

Он использует multiprocessing.shared_memory для совместного использования одного массива размерностей numpy (Н+1) (вместо списка массивов размерностей Н) между хост-процессом и дочерними процессами.

Другие вещи, которые отличаются, но не имеют значения:

  • Pool используется в качестве диспетчера контекста, чтобы избежать явного закрытия и присоединения к нему.
  • Timer — это простой контекстный менеджер для временных блоков кода.
  • Некоторые числа были скорректированы случайным образом
  • pool.map заменен на звонки pool.apply_async

pool.map тоже подойдет, но вам нужно создать список аргументов перед вызовом .map и распаковать его в рабочей функции, например:

with Pool(processes=32) as pool:
    args = [(
        shm.name,
        stacked.shape,
        stacked.dtype,
        i,
        j,
    ) for (i,j) in index_combns]
    sums_mp = pool.map(add_mats_shared, args)

# and 

# Helper for mp version
def add_mats_shared(args):
    shm_name, array_shape, array_dtype, i1, i2 = args
    shm = shared_memory.SharedMemory(name=shm_name)
    ....

Спасибо за помощь. Хотя я пытаюсь вызвать функцию add_nums для элементов myList, а не для индексов, должен ли я передавать всю myList в функцию при каждом вызове? Не вызовет ли это каких-либо проблем с производительностью?

The_Questioner 21.03.2022 19:55

@The_Questioner О, вы могли бы просто вызывать комбинации в myList напрямую, а не индексы. Я обновлю.

jedwards 21.03.2022 19:58

Спасибо еще раз. К сожалению, это невозможно для моей реальной задачи (хотя это будет работать для этой упрощенной версии), потому что в моей реальной задаче вместо простых целых чисел myList представляет собой список больших массивов. Таким образом, создание таких комбинаций приведет к созданию тысяч ненужных копий этих больших массивов.

The_Questioner 21.03.2022 20:04

Справедливо, редактирую...

jedwards 21.03.2022 20:07

Хм, я так и не разобрался. Мне было интересно, сможете ли вы заставить его работать, не проходя полностью myList снова и снова.

The_Questioner 21.03.2022 20:34

Извините, я никогда не обновлял его. Я думаю, вам понадобится что-то вроде Менеджер или Общая память. Чтобы эффективно обеспечивать myList дочерние процессы. myList - это список пустых массивов?

jedwards 21.03.2022 21:12

Да, это пустые массивы длиной ~ 70 000. Извините за поздний ответ.

The_Questioner 21.03.2022 21:43

@The_Questioner обновлен с использованием подхода shared_memory

jedwards 21.03.2022 22:11

Благодарю вас! Я читаю об общей памяти, но, похоже, это то, что мне нужно, чтобы обойти мою проблему. По какой-то причине MP-версия намного медленнее, чем не-MP, но я попытаюсь это выяснить. Моим главным препятствием была ошибка рассола, и вы очень помогли с этим. Спасибо!

The_Questioner 21.03.2022 22:49

@The_Questioner «По какой-то причине MP-версия намного медленнее, чем не-MP, но я попытаюсь это выяснить». Попробуйте раскомментировать строку time.sleep внутри функции add_mats (эмулируя более дорогой обратный вызов worker), и вы должны увидеть изменение производительности, как и ожидали.

jedwards 21.03.2022 22:58

@jedwards - ваше предложение игнорирует все дополнительные расходы, которые убивают производительность, когда масштабы увеличиваются (здесь это обычное явление) - (а) операции со списками «хороши» в школьных учебниках, как SLOC, но убийцы в больших Оперативная память (как в данном случае), тем хуже, если внутри «хороших» итераторов (б) затраты на добавление интерпретатора Python для создания экземпляра процесса являются огромными блокировщиками — посмотрите на влияние закона Амдала, если мы начнем не игнорировать дополнительные затраты (инструкции никогда не присутствуют в чистом [SERIAL] базовом запуске) stackoverflow.com/revisions/18374629/3 (c) последнее, но не менее важное, обработанный блок numpy ...

user3666197 21.03.2022 23:07

@user3666197 user3666197 У меня действительно нет предложения. ОП представил пример проблемы, когда они хотели поделиться большим набором массивов без многократного копирования каждого, и это решает эту проблему. Поскольку это примерная проблема, у меня нет возможности узнать, является ли MP правильным маршрутом или нет, но я предполагаю, что они смогут выяснить это довольно быстро, как только заработают. С точки зрения (с) неясно, что вы имеете в виду. Если вы говорите о GIL и потоках, то это процессы. Если вы говорите о пустых библиотеках, таких как MKL или BLAS, это выходит за рамки этого вопроса, imo.

jedwards 21.03.2022 23:17

Согласитесь, что O/P заявил о желании учиться. Тем не менее, лямбда-маринование является маргинальным блокировщиком в этом, совместное использование является убийцей производительности как внутри экосистемы интерпретатора Python (на основе потоков, из-за блокировки GIL), так и в экзосистеме (на основе процессов, из-за снижения производительности). "-эмуляция накладных расходов конвейера SER/xfer/DES между процессами, чем больше блокирует независимость, процессы были порождены для получения, не так ли? ) в (c) опечатка /извините/ аргумент был в том, что чем больше процессов занимают ядра ЦП, тем меньше ядер остается свободным для собственных многоядерных запусков уровня HPC.

user3666197 22.03.2022 01:10

ИМХО, конфликтующие анти-паттерны здесь не выходят за рамки, так как в результате производительность только пострадает. Наличие явного оператора O/P о том, что код использует большой-список большой-numpy-массивов, является мигающим предупреждением о красном свете, это никогда не будет работать быстро в поэтапном P2P-распределении последовательного итератора исчисления парных списков-членов. Можно не говорить об этом О/П, но в таком случае мы не дали здесь честного совета, не так ли? Я не пытаюсь умолять вступать в наномасштабные споры о (не)преимуществах MKL против BLAS, но оставлять O/P внутри гигантской дыры расширяющегося анти-паттерна?

user3666197 22.03.2022 01:16

Q :
" ... trying to learn about multiprocessing in order to solve a problem at work. "

А:
единственный самый важный опыт, который нужно изучить
, это то, насколько БОЛЬШИМ являются РАСХОДЫ-из-(процесса)-ИНСТАНЦИЯ(s),
все остальные дополнительные накладные расходы
(но ни в коем случае не незначительные, тем более в увеличение масштабов проблемы )
детали по сравнению с этой огромной и главной.

Before the answer is read-through and completely understood, here is a Live GUI-interactive simulator of how much we will have to pay to start using more than 1 stream of process-flow orchestration ( costs vary - lower for threads, larger for MPI-based distributed operations, highest for multiprocessing-processes, as used in Python Interpreter, where N-many copies of the main Python Interpreter process get first copied (RAM-allocated and O/S scheduler spawned - as 2022-Q2 still reports issues if less expensive backends try to avoid this cost, yet at problems with deadlocking on wrong-shared or ill-copied or forgotten to copy some already blocking MUTEX-es and similar internalities - so that even the full-copy is not always safe in 2022 - not having met them in person does not mean these do not still exist, as documented by countless professionals - a story about a pool of sharks is a good place to start from )

Live Simulatordescription here

Инвентаризация проблем:

а) травление лямбд ( and many other SER/DES blockers )

легко — достаточно conda install dill и import dill as pickle как умеет укроп, годами травить их — кредит @MikeMcKearns и вашему коду не нужно рефакторить использование простого интерфейса pickle.dumps()-вызова. Таким образом, использование pathos.multiprocess по умолчанию используется для внутреннего использования dill, и этой уже давно известной слабости multiprocessing SER/DES удается избежать.

б) убийцы производительности

- multiprocessing.Pool.map() это скорее End-to-End анти-шаблон производительности здесь - Расходы..., если мы начнем не пренебрегать ими, покажите, сколько CPU-тактов и заблокированных физических-RAM-вводов/выводов платится за столько процессов- экземпляров ( 60+ ), которые, наконец, «занимают» почти все физические ядра ЦП, но оставляют, таким образом, почти нулевое пространство для действительно высокой производительности numpy-родных многоядерных вычислений основной проблемы (для которой ожидалось, что максимальная производительность будет форсирован, не так ли? )

- просто переместите ползунок п в симуляторе на что-то меньшее, чем 100% (без [SERIAL]-части выполнения задачи, что хорошо в теории, но никогда не выполнимо на практике, даже запуск программы является чистым [SERIAL], по замыслу )

- просто переместите ползунок Оverhead в симуляторе на что-нибудь выше простого нуля (выражая относительную дополнительную стоимость создания одного из процессов НCPUcores, как число процентов, по отношению к такому номеру части [PARALLEL]-раздела инструкций - математически "плотная" работа имеет много таких "полезных"-инструкций и может , предполагая, что никакие другие убийцы производительности не выпрыгивают из коробки, могут потратить некоторую разумную сумму «дополнительных» затрат, чтобы породить некоторое количество одновременных или параллельных операций (фактическое число зависит только от фактического экономия затрат, а не от того, сколько CPU-ядра присутствуют, тем более по нашим "пожеланиям" или схоластическим или еще хуже копипаст-"советам" ). Наоборот, математически «мелкая» работа почти всегда имеет «ускорение» << 1 (замедление огромный), так как почти нет шансов оправдать известные дополнительные затраты (оплата на процесс-инстанциациях, данные SER/xfer /DES перемещается внутрь (параметры) и назад (результаты)

- далее переместите ползунок Оverhead в симуляторе к самому правому краю == 1. Это показывает случай, когда фактические накладные расходы (потеря времени), порождающие процесс, по-прежнему не превышают <= 1 % всех последующих инструкций, связанных с вычислениями, которые будут выполняться для «полезной» части. работы, которая будет вычисляться внутри такого порожденного экземпляра процесса. Таким образом, даже такой коэффициент пропорции 1:100 (выполнение в 100 раз больше «полезной» работы, чем потерянное процессорное время, сожженное для организации такого количества копий и заставляющего O/S-планировщика организовывать их параллельное выполнение внутри доступной системной виртуальной памяти) имеет уже все предупреждения видны на графике прогрессии Speedup-деградации - просто поиграйте немного с Оverhead-слайдером в симуляторе, прежде чем трогать остальные...

- избежать греха «совместного использования» (если целью является производительность) - опять же, затраты на эксплуатацию такой оркестровки между несколькими процессами интерпретатора Python, теперь независимыми, влекут за собой дополнительные дополнительные затраты, которые никогда не оправдываются повышением производительности, поскольку борьба за занимая общие ресурсы (ядра ЦП, каналы физической ОЗУ-ввода-вывода) только опустошает коэффициенты повторного использования кэш-памяти процессора, переключатели контекста процесса, управляемые планировщиком O/S, и все это еще больше снижает итоговую -Конец производительности (чего мы не хотим, не так ли?)

в) повышение работоспособности

- уважайте факты о фактические затраты любой вычислительной операции
- избегайте "поверхностных" шагов вычислений,
- максимизируйте то, что так дорого обходится в набор распределенных-процессов (если такая необходимость остается),
- избегайте все операции добавления служебных данных (например, добавление локальных временных переменных, где встроенные операции позволяют вместо хранения частичных результатов)
и
- попробуйте перейти к использованию сверхпроизводительного, дружественного к кэшу и оптимизированного, родного numpy-векторизованного многоядерного и пошагового возможности трюков, не блокируемые предварительно перегруженными ядрами ЦП за счет планирования такого количества (~ 60) копий процесса Python Interpreter, каждая из которых пытается вызвать numpy-код, таким образом, не имея свободных ядер для фактического размещения такого высокопроизводительного кэша. -удобные для повторного использования векторные вычисления (здесь мы получаем или теряем большую часть производительности, а не в медленных последовательных итераторах, не в порождении 60+ основанных на процессах полных копий «__main__» интерпретатора Python, прежде чем выполнять один кусок полезной работы на наших больших данных, дорого РА M-распределено и физически скопировано более 60 раз)

- рефакторинг реальной проблемы никогда не должен противоречить собранному знания о производительности, так как повторение того, что не работает, не принесет никаких преимуществ, не так ли?
- уважайте ограничения вашей физической платформы , игнорирование их ухудшит вашу производительность
- тест, профиль, рефакторинг
- тест, профиль, рефакторинг
- тест, профиль, рефакторинг
других волшебных палочек здесь нет

и как только вы уже работаете на переднем крае производительности, установите gc.disable() прежде чем вы создадите интерпретатор Python в N-множестве реплицированных копий, чтобы не ждать спонтанной сборки мусора при достижении максимальной производительности

Спасибо за ответ. Признаться, я не совсем понимаю все, что здесь написано. Но мне потребуется некоторое время, чтобы пройти через это, и я уверен, что это будет огромной помощью, когда я это сделаю :)

The_Questioner 22.03.2022 18:46
Всегда пожалуйста @The_Questioner - ваше математическое воображение и знание фактической физики, того, как работают внутри наши компьютерные игрушки, помогут вам следовать логике как оригинальных, так и дополнительных накладных расходов с учетом «Закон убывающей отдачи»
user3666197 22.03.2022 21:40

Другие вопросы по теме