Я пытаюсь перебрать более 100 000 изображений, захватить некоторые функции изображения и сохранить полученный фрейм данных на диске в виде файла рассола.
К сожалению, из-за ограничений ОЗУ я вынужден разбить изображения на куски по 20 000 и выполнить над ними операции, прежде чем сохранять результаты на диск.
Код, написанный ниже, должен сохранить фрейм данных результатов для 20 000 изображений перед запуском цикла для обработки следующих 20 000 изображений.
Однако, похоже, это не решает мою проблему, поскольку память не освобождается из ОЗУ в конце первого цикла for.
Так вот где-то при обработке 50-тысячной записи программа вылетает из-за Out of Memory Error.
Я попытался удалить объекты после их сохранения на диск и вызова сборщика мусора, однако использование ОЗУ, похоже, не снижается.
Что мне не хватает?
#file_list_1 contains 100,000 images
file_list_chunks = list(divide_chunks(file_list_1,20000))
for count,f in enumerate(file_list_chunks):
# make the Pool of workers
pool = ThreadPool(64)
results = pool.map(get_image_features,f)
# close the pool and wait for the work to finish
list_a, list_b = zip(*results)
df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
del list_a
del list_b
del df
gc.collect()
pool.close()
pool.join()
print("pool closed")
Из кода вы можете видеть, что я использовал del, а также вызывал сборщик мусора, но, похоже, он не ведет себя так, как вы описали.
proc.get_memory_info(), чтобы сравнить использование памяти до и после GC. Вы также можете невольно фрагментировать свою кучу, которую сборщик мусора Python может или не может дефрагментировать для вас (что приводит к увеличению использования памяти, даже когда вы «удаляете и собираете» эти мертвые объекты).
Не используйте потоки для задач с интенсивным использованием ЦП, вместо этого используйте процессы. В любом случае, не устанавливайте количество параллельных задач больше, чем количество процессоров на вашем компьютере.
Что происходит внутри get_image_features? То, что вы делаете в своем фрагменте, прекрасно.
@will Я звоню в конечную точку REST API. Это не задача, связанная с процессором. Поэтому я использую threads.
отвечать @Andy Hayden определенно подходит для этого.
у тебя есть value = threading.local()? Без каких-либо подробностей про get_image_features непонятно, что там происходит -- может быть, он использует какой-то "кеш", который растет с каждым обработанным файлом, может быть, он не закрывает файлы после обработки, может быть, он хранит ссылки на какие-то объекты, построенные из файлов, и эти объекты едят всю память. Если вы делаете какие-то удаленные запросы, то, в зависимости от библиотеки, там тоже что-то может быть "кэшировано".
Также вы можете немного «профилировать», распечатав топ-10 самых больших объектов из globals(): var_sizes = {}; for var_name, var_value in globals().items(): var_sizes[var_name] = sys.getsizeof(var_value); [print(f"{var_name}: {size}") for (var_name, size) in sorted(var_sizes.items(), key=lambda k_v: k_v[1])[:10]]. Но для списков/диктов/контейнеров вы также должны добавить размер хранимых элементов (подробнее см. stackoverflow.com/a/30316760/952437)
также pandas.DataFrame иногда может протекать. Подробности и некоторые подходы к исправлению см. на github: github.com/pandas-dev/pandas/issues/2659.
Вы должны учитывать while gc.collect(): pass принудительную коллекцию, так как каждая итерация может собирать больше объектов, на которые сейчас нет ссылок.






Примечание: это не ответ, а краткий список вопросов и предложений.
ThreadPool()from multiprocessing.pool? Это не очень хорошо задокументировано (в python3), и я бы предпочел использовать ThreadPoolExecutor (также см. здесь)sys.getsizeof(), чтобы вернуть список всех объявленных globals() вместе с их объемом памяти.del results (хотя, я думаю, это не должно быть слишком большим)НЕ вызывайте list(), он создает в памяти список того, что возвращается из Division_chunks(). Вот где, вероятно, происходит ваша проблема с памятью.
Вам не нужны все эти данные в памяти сразу. Просто перебирайте имена файлов по одному, чтобы все данные не находились в памяти сразу.
Пожалуйста, опубликуйте трассировку стека, чтобы у нас было больше информации
Сомневаюсь. Это просто разделение списка имен файлов на более мелкие подсписки.
Короче говоря, вы не можете освободить память обратно в интерпретаторе Python. Лучше всего использовать многопроцессорность, поскольку каждый процесс может обрабатывать память самостоятельно.
Сборщик мусора "освободит" память, но не в том контексте, в котором вы могли бы ожидать. Обработку страниц и пулов можно изучить в исходном коде CPython. Здесь также есть статья высокого уровня: https://realpython.com/python-memory-management/
GC автоматически собирает динамически сохраняемые данные. Для повторно используемых или статических значений вам понадобится gc.collect(), например встроенные типы int, char и т. д.
Я думаю, что это будет возможно с сельдерей, благодаря celery вы можете легко использовать параллелизм и параллелизм с python.
Обработка изображений кажется идемпотентной и атомарной, поэтому это может быть сельдерей задача.
Вы можете запустить несколько рабочих, который будет обрабатывать задачи - работать с изображением.
Кроме того, у него есть конфигурация для утечек памяти.
Вопрос был об использовании памяти, а не о том, как распараллелить задачу.
Мое решение таких проблем — использовать какой-нибудь инструмент параллельной обработки. Я предпочитаю Joblib, так как он позволяет распараллелить даже локально созданные функции (которые являются «деталями реализации», поэтому лучше не делать их глобальными в модуле). Другой мой совет: не используйте потоки (и пулы потоков) в python, вместо этого используйте процессы (и пулы процессов) — это почти всегда лучшая идея! Просто не забудьте создать пул как минимум из 2 процессов в joblib, иначе он будет запускать все в исходном процессе python, и поэтому ОЗУ в конце концов не будет освобождено. После автоматического закрытия рабочих процессов joblib выделенная ими оперативная память будет полностью освобождена ОС. Мое любимое оружие — joblib.Parallel. Если вам нужно передать воркерам большие данные (т.е. больше 2 ГБ), используйте joblib.dump (для записи объекта python в файл в основном процессе) и joblib.load (для чтения в рабочем процессе).
О del object: в питоне команда на самом деле не удаляет объект. Это только уменьшает его счетчик ссылок. Когда вы запускаете import gc; gc.collect(), сборщик мусора сам решает, какую память освободить, а какую оставить выделенной, и я не знаю, как заставить его освободить всю возможную память. Еще хуже, если бы какая-то память на самом деле была выделена не питоном, а, например, в каком-то внешнем коде на C/C++/Cython/и т.д., и код не связывал бы счетчик ссылок python с памятью, то вам было бы абсолютно нечего делать. мог сделать, чтобы освободить его из python, за исключением того, что я написал выше, то есть путем завершения процесса python, который выделил ОЗУ, и в этом случае ОС будет гарантированно освобождена. Вот почему единственный 100% надежный способ освободить часть памяти в python — запустить код, который выделяет ее в параллельном процессе, а затем завершить процесс.
Теперь может случиться так, что что-то в 50 000-м очень велико, и это вызывает OOM, поэтому, чтобы проверить это, я сначала попробую:
file_list_chunks = list(divide_chunks(file_list_1,20000))[30000:]
Если произойдет сбой на 10 000, это подтвердит, является ли 20 КБ слишком большим размером фрагмента, или если он снова не сработает на 50 000, есть проблема с кодом...
Ладно, по коду...
Во-первых, вам не нужен явный конструктор list, в питоне гораздо лучше выполнять итерацию, а не генерировать весь список в память.
file_list_chunks = list(divide_chunks(file_list_1,20000))
# becomes
file_list_chunks = divide_chunks(file_list_1,20000)
Я думаю, вы можете неправильно использовать ThreadPool здесь:
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.
Это читается как close, возможно, некоторые мысли все еще работают, хотя я думаю, что это безопасно, оно кажется немного не питоническим, лучше использовать диспетчер контекста для ThreadPool:
with ThreadPool(64) as pool:
results = pool.map(get_image_features,f)
# etc.
Явные dels в python на самом деле не гарантируется освобождение памяти.
Вы должны собрать после при соединении/после with:
with ThreadPool(..):
...
pool.join()
gc.collect()
Вы также можете попробовать разделить это на более мелкие части, например. 10 000 или даже меньше!
Одна вещь, которую я хотел бы сделать здесь, вместо использования pandas DataFrames и больших списков — это использовать базу данных SQL, вы можете сделать это локально с помощью sqlite3:
import sqlite3
conn = sqlite3.connect(':memory:', check_same_thread=False) # or, use a file e.g. 'image-features.db'
и используйте контекстный менеджер:
with conn:
conn.execute('''CREATE TABLE images
(filename text, features text)''')
with conn:
# Insert a row of data
conn.execute("INSERT INTO images VALUES ('my-image.png','feature1,feature2')")
Таким образом, нам не придется обрабатывать большие объекты списка или DataFrame.
Вы можете передать соединение каждому из потоков... вам может понадобиться что-то немного странное, например:
results = pool.map(get_image_features, zip(itertools.repeat(conn), f))
Затем, после завершения расчета, вы можете выбрать все из базы данных в любом удобном для вас формате. Например. используя read_sql.
Используйте здесь подпроцесс, вместо того, чтобы запускать его в том же экземпляре python «shell out» для другого.
Поскольку вы можете передать start и end в python как sys.args, вы можете нарезать их:
# main.py
# a for loop to iterate over this
subprocess.check_call(["python", "chunk.py", "0", "20000"])
# chunk.py a b
for count,f in enumerate(file_list_chunks):
if count < int(sys.argv[1]) or count > int(sys.argv[2]):
pass
# do stuff
Таким образом, подпроцесс будет правильно очищать python (утечек памяти не будет, так как процесс будет завершен).
Могу поспорить, что Hammer 1 - это то, что вам нужно, кажется, что вы склеиваете много данных и без необходимости читаете их в списках python, а использование sqlite3 (или какой-либо другой базы данных) полностью избегает этого.
Спасибо, Энди, у меня не было возможности попробовать эти подходы. На данный момент я закрываю награду и обновлю этот комментарий, как только у меня будет возможность попробовать эти подходы.
Ваша проблема в том, что вы используете многопоточность там, где следует использовать многопроцессорность (с привязкой к ЦП или с привязкой к вводу-выводу).
Я бы реорганизовал ваш код примерно так:
from multiprocessing import Pool
if __name__ == '__main__':
cpus = multiprocessing.cpu_count()
with Pool(cpus-1) as p:
p.map(get_image_features, file_list_1)
а затем я бы изменил функцию get_image_features, добавив (что-то вроде) эти две строки в конец. Я не могу сказать, как именно вы обрабатываете эти изображения, но идея состоит в том, чтобы сделать каждое изображение внутри каждого процесса, а затем сразу же сохранить его на диск:
df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
Таким образом, фрейм данных будет собираться и сохраняться внутри каждого процесса, а не после его выхода. Процессы очищаются из памяти, как только они завершаются, поэтому это должно работать, чтобы сохранить низкий объем памяти.
pd.DataFrame(...) может протекать на некоторых сборках Linux (см. github проблема и "обходной путь"), поэтому даже del df может не помочь.
В вашем случае решение с github можно использовать без обезьяньего исправления pd.DataFrame.__del__:
from ctypes import cdll, CDLL
try:
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
except (OSError, AttributeError):
libc = None
if no libc:
print("Sorry, but pandas.DataFrame may leak over time even if it's instances are deleted...")
CHUNK_SIZE = 20000
#file_list_1 contains 100,000 images
with ThreadPool(64) as pool:
for count,f in enumerate(divide_chunks(file_list_1, CHUNK_SIZE)):
# make the Pool of workers
results = pool.map(get_image_features,f)
# close the pool and wait for the work to finish
list_a, list_b = zip(*results)
df = pd.DataFrame({'filename':list_a,'image_features':list_b})
df.to_pickle("PATH_TO_FILE"+str(count)+".pickle")
del df
# 2 new lines of code:
if libc: # Fix leaking of pd.DataFrame(...)
libc.malloc_trim(0)
print("pool closed")
P.S. Это решение не поможет, если какой-либо отдельный фрейм данных слишком велик. Этому можно помочь, только уменьшив CHUNK_SIZE
Я думаю, что в питоне у нас нет возможности освобождать память. Но мы можем удалить объект Python с помощью команды
del.