Многопроцессорная общая память для передачи больших массивов между процессами

Контекст:
Мне нужно проанализировать некоторые данные о погоде за каждый час года. Для каждого часа мне нужно прочитать некоторые входные данные для каждого часа, прежде чем выполнять какие-либо вычисления. Одним из этих входных данных является очень большой массив x, который не меняется и одинаков для каждого часа года. Результатом является вектор (1D-массив) y, который содержит результат расчета для каждого часа года.

Цель:
Ускорить время вычислений с помощью модуля многопроцессорности. В частности, я пытаюсь передать x каждому процессу, используя shared_memory подмодуль многопроцессорности.

Я использую CPython 3.10.8 в Windows 10, используя Spyder 5.3.3 в качестве IDE.

Код (упрощен для целей тестирования):

import multiprocessing
import numpy as np
from multiprocessing import shared_memory

def multiprocessing_function(args):
    nn, shm_name, y_shm_name, shape, dtype, N = args
    existing_shm = shared_memory.SharedMemory(name=shm_name)
    x = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
    existing_y_shm = shared_memory.SharedMemory(name=y_shm_name)
    y = np.ndarray((N,), dtype=dtype, buffer=existing_y_shm.buf)
    y[nn] = 1
    existing_shm.close()
    existing_y_shm.close()

if __name__ == '__main__':
    x = np.random.rand(int(1e7), 16)
    N = 8760 # Number of hours in a year
            
    dtype = x.dtype
    shm = shared_memory.SharedMemory(create=True, size=x.nbytes)
    shm_array = np.ndarray(x.shape, dtype=x.dtype, buffer=shm.buf)
    np.copyto(shm_array, x)
    y_shm = shared_memory.SharedMemory(create=True, size=N * x.itemsize)
    y_array = np.ndarray((N,), dtype=x.dtype, buffer=y_shm.buf)
    
    args_case = [(nn, shm.name, y_shm.name, x.shape, dtype, N) for nn in range(N)]
    with multiprocessing.Pool() as pool:
        pool.map(multiprocessing_function, args_case)
    
    y = np.array(y_array)

    shm.close()
    y_shm.close()
    shm.unlink()
    y_shm.unlink()

Проблема:
Когда я запускаю код, он возвращает правильный вектор, но в 50% случаев я получаю «Неустрашимое исключение Windows: нарушение прав доступа» и ядро ​​выходит из строя. Если я затем изменю размер массива, возможно, проблем не возникнет, но если я перезапущу Spyder и попытаюсь повторно запустить тот же код с новым размером массива, возникнет та же ошибка, и ядро ​​снова выйдет из строя. Такое непоследовательное поведение невероятно странно. У меня такое ощущение, что это проблема с утечкой памяти, но я не знаю, как ее исправить.

Цель: Ускориться? Прекратите использовать Spyder, прекратите использовать «дорогие» установки с нулевой добавленной стоимостью. Рефакторинг кода, чтобы стать взаимно независимым (отсутствие связи между процессами, нулевое совместное использование означает нулевую блокировку) и использование приемов высокой производительности, может использовать хорошо кэшированные .memmap()-ы для уменьшения объема оперативной памяти многих (полный интерпретатор Python) RAM копирует в Windows)-процессы, чтобы предотвратить сбои.

user3666197 17.07.2024 12:13

Мне не удалось воспроизвести это на своем компьютере (Python 3.10.8, Windows 10, PowerShell). Успешно в 10 случаях из 10. Сможете ли вы воспроизвести это без Spyder?

ken 17.07.2024 13:03

Приятно знать, @ken, но, при всем уважении, объем вашей оперативной памяти (независимо от ее размера), скорее всего, не будет соответствовать O/P, и хорошие новости с вашей стороны не помогут решить проблемы, которые остаются прежними - как перейти к O/P, чтобы устранить проблему на машине, которая использовалась и на которой сбои продолжают появляться снова.

user3666197 17.07.2024 13:18

Вы можете попробовать использовать диспетчер памяти вместо общей памяти. Я не уверен на 100%, как обеспечивается безопасность потоков с помощью объекта общей памяти. Но это при использовании диспетчера памяти. Тогда можно хотя бы исключить эту проблему.

RaJa 17.07.2024 15:00

Я новичок в параллельной обработке, поэтому понятия не имею, как ее исследовать. Я не пробовал делать это за пределами Spyder, возможно, попробую завтра. Однако мне удалось использовать многопроцессорный массив Raarray, чтобы сделать то, что я хотел. Возможно, это не оптимально, но обходной путь подходит для моих нужд, поэтому я просто соглашусь на это. Спасибо за комментарии!

blyatman 17.07.2024 16:24
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
5
87
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Либо сам Spyder, либо оболочка IPython пытается получить доступ к одному из ваших общих массивов numpy после закрытия файла общей памяти. Я предполагаю, что Spyder пытается заполнить панель «Обозреватель переменных» путем перечисления локальных переменных. Это приводит к доступу к массиву numpy, но ячейка памяти, на которую он указывает, больше недействительна, поскольку SharedMemory была закрыта.

SharedMemory создает файлы в файловой системе (чтобы ими можно было делиться) таким образом, что они будут находиться только в памяти (чтобы они были быстрыми). Затем вам предоставляется доступ к этому файлу в виде буфера с отображением в памяти. Есть некоторые различия в зависимости от ОС, но в целом это справедливо. Как и в случае с любым другим файлом, у вас есть немного больше ответственности за уборку: close() и unlink().

К сожалению, Numpy не имеет возможности узнать, что буфер, на который он указывает, закрыт, поэтому он попытается получить доступ к той же памяти, на которую указывал ранее. Windows называет это «Нарушением доступа», а все остальные называют это «Ошибкой сегментации».

Для решения этой проблемы:

  • Вы можете просто изменить настройки в Spyder, чтобы запускать сценарии на внешнем терминале и не получать доступ к массивам после закрытия shm.
  • Вы можете вызвать del shm_array и del y_array после того, как закончите с ними (прямо перед закрытием ШМ). Это удалит их из области действия модуля, и ядро ​​IPython не будет пытаться получить к ним доступ.
  • Вы можете поместить материал в функцию, чтобы ваши массивы numpy выходили за рамки и автоматически собирали мусор при возврате функции.
  • Вы можете просто не закрывать файлы shm: как и другие дескрипторы файлов, они закрываются при выходе из процесса Python. Закрытие текущей оболочки (и запуск новой) должно помочь. В Windows, если ни один процесс не имеет дескриптора какого-либо открытого SHM, он будет автоматически удален. В Linux (а может быть, в MacOS?) файл будет удален только при вызове unlink или при перезагрузке компьютера.

Это сработало как шарм! Я проверил ваши 2-е и 4-е предложения, и пока никаких проблем не возникло. Спасибо!

blyatman 20.07.2024 03:06

Другие вопросы по теме

Как эффективно выполнять параллельный поиск файлов с использованием pathlib `glob` в Python для больших структур каталогов?
Как использовать 128-битные числа с плавающей запятой и комплексные числа в OpenCL/CUDA?
Как я могу сделать прогрессивную полосу с помощью JProgressBar из Swing на Java?
«Ссылка на объект не установлена ​​на экземпляр объекта» при вызове метода NoteProperty, добавленного в параллельную обработку
Параллельная обработка MCE: копирование и изменение AoA
Более быстрый/параллельный способ объединения нескольких 3D-массивов Numpy в один существующий 3D-массив
Почему java.util.Collection#parallelStream утроит время выполнения при использовании одного потока?
Параллельный режим OpenMP существенно медленнее последовательного
Как сохранить два отдельных CSV-файла при распараллеливании кода?
OpenCL 1.2: согласованность глобальной памяти, связанная с атомарными операциями?