Я запускаю конвейер на нескольких изображениях. Конвейер состоит из чтения изображений из файловой системы, обработки каждого из них, а затем сохранения изображений в файловой системе. Однако рабочий dask терпит неудачу из-за MemoryError. Есть ли способ гарантировать, что рабочие dask не загружают в память слишком много изображений? т. е. Подождите, пока на рабочем месте не будет достаточно места, прежде чем запускать конвейер обработки на новом изображении.
У меня есть один планировщик и 40 рабочих с 4 ядрами, 15 ГБ оперативной памяти и работающим Centos7. Я пытаюсь обработать 125 изображений в пакете; каждое изображение довольно большое, но достаточно маленькое, чтобы поместиться на рабочем; около 3 ГБ требуется для всего процесса.
Я попытался обработать меньшее количество изображений, и это прекрасно работает.
ОТРЕДАКТИРОВАНО
from dask.distributed import Client, LocalCluster
# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources = {'process': 1}))
paths = ['list', 'of', 'paths']
# Read the file data from each path
data = client.map(read, path, resources = {'process': 1)
# Apply foo to the data n times
for _ in range(n):
data = client.map(foo, x, resources = {'process': 1)
# Save the processed data
data.map(save, x, resources = {'process': 1)
# Retrieve results
client.gather(data)
Я ожидал, что изображения будут обрабатываться, поскольку на рабочих процессах было доступно пространство, но похоже, что все изображения загружаются одновременно на разных рабочих процессах.
Обновлено: Мои проблемы в том, что все задачи назначаются рабочим, и им не хватает памяти. Я нашел, как ограничить количество задач, которые рабочий обрабатывает в один момент [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process] (см. здесь]. Однако с этим ограничением, когда я выполняю свою задачу, все они завершают шаг чтения, затем шаг обработки и, наконец, шаг сохранения. Это проблема, так как образ проливается на диск.
Есть ли способ завершить каждую задачу, прежде чем начинать новую? например на Worker-1: прочитать (img1) -> обработать (img1) -> сохранить (img1) -> прочитать (img2) ->...
Dask обычно не знает, сколько памяти потребуется задаче, он может знать только размер выходных данных, и то только после их завершения. Это связано с тем, что Dask просто выполняет функцию pthon, а затем ждет ее завершения; но все остальное может происходить внутри функции python. Как правило, вы должны ожидать, что начнется столько задач, сколько у вас есть доступных рабочих ядер — как вы обнаружите.
Если вам нужна меньшая общая нагрузка на память, то ваше решение должно быть простым: иметь достаточно небольшое количество рабочих процессов, чтобы, если все они используют максимальное количество памяти, которое вы можете ожидать, у вас все еще оставался запас в системе, чтобы справиться .
Обновлено: вы можете попробовать запустить оптимизацию на графике перед отправкой (хотя я думаю, что это должно произойти в любом случае), так как похоже, что ваши линейные цепочки задач должны быть «плавлены». http://docs.dask.org/en/latest/optimize.html
Попробовал предложенный вами метод. Однако я не знал, как получить dask_graph с помощью клиентского API, если это возможно, поэтому я вернулся к использованию Bag API. Это вернуло проблему ограничения ресурсов для рабочих. У вас есть идея, как ограничить рабочий ресурс при использовании dask.bag?
обязательно используйте клиент для вычислений и ограничьте количество рабочих.
Мне удалось сделать это с помощью предохранителя и установки лимита ресурсов для каждого работника. Спасибо
Мне удалось установить лимит ресурсов для каждого работника. Однако у меня есть некоторые проблемы с планированием задач (см. Редактирование).