Python multiprocessing.Processgetting генерирует AssertionError при попытке получить возвращенные результаты?

Я пытаюсь использовать multiprocessing для очень медленной задачи, выполняемой как единый процесс. Как вы можете видеть в приведенном ниже коде, каждый процесс должен возвращать некоторые результаты (return_dict). Сначала я протестировал этот код с набором данных из 10К строк (данные, хранящиеся в файле docs.txt, около 70 МБ), и код работал, как ожидалось. Однако, когда я использовал сценарий для полного набора данных (примерно 5,6 ГБ), я получил AssertionError, как показано в конце моего вопроса. Интересно, знает ли кто-нибудь, чем это могло быть вызвано, и как я могу этого избежать. Спасибо.

from multiprocessing import Process, Manager
import os, io, numpy
from gensim.models.doc2vec import Doc2Vec

def worker(i, data, return_dict):
    model = Doc2Vec.load("D:\\Project1\\doc2vec_model_DM_20180814.model")
    results = numpy.zeros((len(data), model.vector_size))
    for id, doc in enumerate(data):
        results[id,:] = model.infer_vector(doc, alpha = 0.01, steps = 100)
    return_dict[i] = results

if __name__ == '__main__':
    import time
    a  = time.time()
    path = "D:\\Project1\\docs.txt"    # <<=== data stored in this file
    data = []
    manager = Manager()
    jobs = []
    return_dict = manager.dict()

    with io.open(path, "r+", encoding = "utf-8") as datafile:
        for id, row in enumerate(datafile):
            row = row.strip().split('\t')[0].split()
            data.append(row)

    step = numpy.floor(len(data)/20)
    intervals = numpy.arange(0, len(data), step = int(step)).tolist()
    intervals.append(len(data))

    for i in range(len(intervals) - 1):
        p = Process(target=worker, args=(i, data[intervals[i]:intervals[i+1]], return_dict))
        jobs.append(p)
        p.start()
    for proc in jobs:
        proc.join()

    results = numpy.zeros((len(data), 1000))
    start = 0
    end = 0
    for _, result in return_dict.items():    #<<===Where error happens
        end = end + result.shape[0]
        results[start:end,:] = result[:,:]
        start = end

    print(time.time() - a)

Сообщение об ошибке:

Traceback (most recent call last):
  File "D:\Project1\multiprocessing_test.py", line 43, in <module>
    for _, result in return_dict.items():
  File "<string>", line 2, in items
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\managers.py", line 757, in _callmethod
    kind, result = conn.recv()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 318, in _recv_bytes
    return self._get_more_data(ov, maxsize)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\connection.py", line 337, in _get_more_data
    assert left > 0
AssertionError

Как насчет использования памяти в течение всего процесса? Использование памяти должно быть между 2*5.6gb и 3*5.6gb, включая виртуальную память.

Sraw 17.08.2018 16:00

память может быть проблемой? Result_dict содержит 20 элементов, каждый элемент представляет собой массив numpy размером примерно 425 КБ строк и 1000 столбцов значений с плавающей запятой. Мой компьютер имеет оперативную память 256 ГБ, во время фактической многопроцессорной фразы у компьютера было свободно 50% оперативной памяти. Когда произошла ошибка, я был за компьютером, поэтому не знаю, как тогда использовалась память.

Alex 17.08.2018 16:18

Ну, раз уж у вас такой большой баран, думаю, это не будет проблемой с памятью. По сути, проблема заключается в том, что дочерние процессы не переносят все данные в основной процесс. Это испортилось, поскольку я думаю, что return_dict[i] = results должен быть блокирующей операцией. Я не могу отладить ваш код, поэтому не могу понять основную причину. Альтернативным решением может быть использование concurrent.futures.ProcessPoolExecutor, поскольку оно позволяет получить возвращаемое значение целевой функции: future = executor.submit(args) и result = future.result().

Sraw 17.08.2018 16:37

Возможный дубликат Python multiprocessing apply_async "assert left> 0" AssertionError

Alex 02.09.2018 23:03
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
4
322
2

Ответы 2

Полагаю, вы используете всю доступную память. dict.items() создает копию вашего dict, заполненную всеми вашими элементами и использующую много памяти. Лучше просто использовать dict.iteritems() для перебора результата.

Обновлено: извините, я сначала не заметил тега python-3. В Python3 dict.items() больше не возвращает копию и его можно использовать.

Соответствующий код connection.py в многопроцессорной обработке:

left = _winapi.PeekNamedPipe(self._handle)[1]
assert left > 0

Вы используете окна? Итак, я предполагаю, что это проблема, связанная с Windows, кажется, что PeekNamedPipe возвращает 0.

Да, вы можете просто использовать items (). Python 3 items () - это Python 2 iteritems ().

Frieder 17.08.2018 15:54

Это мой случай и решение. Надеюсь, это поможет! У меня есть функция для обработки с именем "func"

partial_func = partial(func,a=params1,b=params2)
for i, _ in enumerate(pool.imap(partial_func, [1]))):
    pass

основная причина - параметры params1 и params2, которые я передаю в "partial_func", слишком велики.

Другие вопросы по теме