Я пытаюсь использовать multiprocessing для очень медленной задачи, выполняемой как единый процесс. Как вы можете видеть в приведенном ниже коде, каждый процесс должен возвращать некоторые результаты (return_dict). Сначала я протестировал этот код с набором данных из 10К строк (данные, хранящиеся в файле docs.txt, около 70 МБ), и код работал, как ожидалось. Однако, когда я использовал сценарий для полного набора данных (примерно 5,6 ГБ), я получил AssertionError, как показано в конце моего вопроса. Интересно, знает ли кто-нибудь, чем это могло быть вызвано, и как я могу этого избежать. Спасибо.
from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec
def worker(i, data, return_dict):
model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
results = numpy.zeros((len(data), model.vector_size))
for id, doc in enumerate(data):
results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
return_dict[i] = results
if __name__ == '__main__':
import time
a = time.time()
path = "D:\\Project1\\docs.txt" # <<=== data stored in this file
data = []
manager = Manager()
jobs = []
return_dict = manager.dict()
with io.open(path, "r+", encoding = "utf-8") as datafile:
for id, row in enumerate(datafile):
row = row.strip().split('\t')[0].split()
data.append(row)
step = numpy.floor(len(data)/20)
intervals = numpy.arange(0, len(data), step = int(step)).tolist()
intervals.append(len(data))
for i in range(len(intervals) - 1):
p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
results = numpy.zeros((len(data), 1000))
start = 0
end = 0
for _, result in return_dict.items(): #<<===Where error happens
end = end + result.shape[0]
results[start:end,:] = result[:,:]
start = end
print(time.time() - a)
Сообщение об ошибке:
Traceback (most recent call last):
File "D:\Project1\multiprocessing_test.py", line 43, in <module>
for _, result in return_dict.items():
File "<string>", line 2, in items
File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
kind, result = conn.recv()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
buf = self._recv_bytes()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
assert left > 0
AssertionError
память может быть проблемой? Result_dict содержит 20 элементов, каждый элемент представляет собой массив numpy размером примерно 425 КБ строк и 1000 столбцов значений с плавающей запятой. Мой компьютер имеет оперативную память 256 ГБ, во время фактической многопроцессорной фразы у компьютера было свободно 50% оперативной памяти. Когда произошла ошибка, я был за компьютером, поэтому не знаю, как тогда использовалась память.
Ну, раз уж у вас такой большой баран, думаю, это не будет проблемой с памятью. По сути, проблема заключается в том, что дочерние процессы не переносят все данные в основной процесс. Это испортилось, поскольку я думаю, что return_dict[i] = results должен быть блокирующей операцией. Я не могу отладить ваш код, поэтому не могу понять основную причину. Альтернативным решением может быть использование concurrent.futures.ProcessPoolExecutor, поскольку оно позволяет получить возвращаемое значение целевой функции: future = executor.submit(args) и result = future.result().
Возможный дубликат Python multiprocessing apply_async "assert left> 0" AssertionError






Полагаю, вы используете всю доступную память.
dict.items() создает копию вашего dict, заполненную всеми вашими элементами и использующую много памяти. Лучше просто использовать dict.iteritems() для перебора результата.
Обновлено: извините, я сначала не заметил тега python-3. В Python3 dict.items() больше не возвращает копию и его можно использовать.
Соответствующий код connection.py в многопроцессорной обработке:
left = _winapi.PeekNamedPipe(self._handle)[1]
assert left > 0
Вы используете окна? Итак, я предполагаю, что это проблема, связанная с Windows, кажется, что PeekNamedPipe возвращает 0.
Да, вы можете просто использовать items (). Python 3 items () - это Python 2 iteritems ().
Это мой случай и решение. Надеюсь, это поможет! У меня есть функция для обработки с именем "func"
partial_func = partial(func,a=params1,b=params2)
for i, _ in enumerate(pool.imap(partial_func, [1]))):
pass
основная причина - параметры params1 и params2, которые я передаю в "partial_func", слишком велики.
Как насчет использования памяти в течение всего процесса? Использование памяти должно быть между
2*5.6gbи3*5.6gb, включая виртуальную память.