Почему dask worker терпит неудачу из-за MemoryError в задаче «маленького» размера? [Dsk.bag]

Я запускаю конвейер на нескольких изображениях. Конвейер состоит из чтения изображений из файловой системы, обработки каждого из них, а затем сохранения изображений в файловой системе. Однако рабочий dask терпит неудачу из-за MemoryError. Есть ли способ гарантировать, что рабочие dask не загружают в память слишком много изображений? т. е. Подождите, пока на рабочем месте не будет достаточно места, прежде чем запускать конвейер обработки на новом изображении.

У меня есть один планировщик и 40 рабочих с 4 ядрами, 15 ГБ оперативной памяти и работающим Centos7. Я пытаюсь обработать 125 изображений в пакете; каждое изображение довольно большое, но достаточно маленькое, чтобы поместиться на рабочем; около 3 ГБ требуется для всего процесса.

Я попытался обработать меньшее количество изображений, и это прекрасно работает.

ОТРЕДАКТИРОВАНО

from dask.distributed import Client, LocalCluster

# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources = {'process': 1}))

paths = ['list', 'of', 'paths']

# Read the file data from each path
data = client.map(read, path, resources = {'process': 1)

# Apply foo to the data n times
for _ in range(n):
    data = client.map(foo, x, resources = {'process': 1)

# Save the processed data
data.map(save, x, resources = {'process': 1)

# Retrieve results
client.gather(data)

Я ожидал, что изображения будут обрабатываться, поскольку на рабочих процессах было доступно пространство, но похоже, что все изображения загружаются одновременно на разных рабочих процессах.

Обновлено: Мои проблемы в том, что все задачи назначаются рабочим, и им не хватает памяти. Я нашел, как ограничить количество задач, которые рабочий обрабатывает в один момент [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process] (см. здесь]. Однако с этим ограничением, когда я выполняю свою задачу, все они завершают шаг чтения, затем шаг обработки и, наконец, шаг сохранения. Это проблема, так как образ проливается на диск.

Есть ли способ завершить каждую задачу, прежде чем начинать новую? например на Worker-1: прочитать (img1) -> обработать (img1) -> сохранить (img1) -> прочитать (img2) ->...

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
0
426
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Dask обычно не знает, сколько памяти потребуется задаче, он может знать только размер выходных данных, и то только после их завершения. Это связано с тем, что Dask просто выполняет функцию pthon, а затем ждет ее завершения; но все остальное может происходить внутри функции python. Как правило, вы должны ожидать, что начнется столько задач, сколько у вас есть доступных рабочих ядер — как вы обнаружите.

Если вам нужна меньшая общая нагрузка на память, то ваше решение должно быть простым: иметь достаточно небольшое количество рабочих процессов, чтобы, если все они используют максимальное количество памяти, которое вы можете ожидать, у вас все еще оставался запас в системе, чтобы справиться .

Обновлено: вы можете попробовать запустить оптимизацию на графике перед отправкой (хотя я думаю, что это должно произойти в любом случае), так как похоже, что ваши линейные цепочки задач должны быть «плавлены». http://docs.dask.org/en/latest/optimize.html

Мне удалось установить лимит ресурсов для каждого работника. Однако у меня есть некоторые проблемы с планированием задач (см. Редактирование).

mathdugre 12.04.2019 00:55

Попробовал предложенный вами метод. Однако я не знал, как получить dask_graph с помощью клиентского API, если это возможно, поэтому я вернулся к использованию Bag API. Это вернуло проблему ограничения ресурсов для рабочих. У вас есть идея, как ограничить рабочий ресурс при использовании dask.bag?

mathdugre 12.04.2019 04:59

обязательно используйте клиент для вычислений и ограничьте количество рабочих.

mdurant 12.04.2019 15:27

Мне удалось сделать это с помощью предохранителя и установки лимита ресурсов для каждого работника. Спасибо

mathdugre 12.04.2019 20:53

Другие вопросы по теме

Похожие вопросы

Можно ли назначить рабочие ресурсы для распределенного работника dask после создания?
Как получить результаты задач, когда они закончатся, а не после того, как все закончилось в Dask?
Как исправить ошибку «Столбцы в вычисляемых данных не соответствуют столбцам в предоставленных метаданных»?
Как я могу получить результат вычисления Dask на другом компьютере, отличном от того, который его отправил?
Как получить результат задачи в плагине планировщика dask
Ошибка «Переменная не выбирается» с scheduler = «processes» при использовании dask.array из файла netcdf
Генерировать перестановки для большого набора данных, используя dask и pandas
Добавление столбца в фрейм данных dask, вычисление его через скользящее окно
Простой способ реверсировать распределенный фрейм данных dask
Создание нового столбца фрейма данных dask с использованием значений из другого фрейма данных вызывает ошибку «размеры фрагментов неизвестны»