Для каждой строки текстового файла я хочу выполнить тяжелые вычисления. Количество строк может достигать миллионов, поэтому я использую многопроцессорность:
num_workers = 1
with open(my_file, 'r') as f:
with multiprocessing.pool.ThreadPool(num_workers) as pool:
for data in pool.imap(my_func, f, 100):
print(data)
Поэтому я тестирую в интерактивном режиме ThreadPool()
(будет заменено в финальной версии). В документации map
или imap
говорится:
Этот метод разбивает итерируемый объект на несколько фрагментов, которые он передает в пул процессов как отдельные задачи.
Поскольку мой открытый файл является итерируемым (каждая итерация представляет собой строку), я ожидаю, что это сработает, но строки разбиваются посередине. Я сделал генератор, который возвращает строки как положено, но я хочу понять, зачем вообще нужен генератор. Почему фрагментирование не происходит на границах строк?
ОБНОВЛЯТЬ:
чтобы уточнить, использую ли я этот генератор:
def chunked_reader(file, chunk_size=100):
with open(file, 'r') as f:
chunk = []
i = 0
for line in f:
if i == chunk_size:
yield chunk
chunk = []
i = 0
i += 1
chunk.append(line)
# yield final chunk
yield chunk
расчет работает, возвращает ожидаемые значения. Если я использую файловый объект напрямую, я получаю ошибки, указывающие на то, что фрагментирование не разбивается на границы строк.
ОБНОВЛЕНИЕ 2:
он разбивает строку на каждый отдельный символ, а не построчно. Я также могу использовать другую среду с Python 3.9, и она будет иметь такое же поведение. Так было уже некоторое время, и кажется, что это «так, как задумано», но не очень интуитивно понятно.
ОБНОВЛЕНИЕ 3:
Чтобы прояснить отмеченное решение, я понял, что chunk_size отправит список данных для обработки в my_func. Внутренне my_func перебирает переданные данные. Но поскольку мое предположение неверно и каждая строка отправляется в my_func отдельно независимо от chunk_size, внутренняя итерация выполнялась по строке, а не по списку строк, как я ожидал.
Это Python 3.12.4 для Windows
вы думаете, что функция вызывается с часком в качестве входных данных, поэтому вы перебираете входные данные внутри my_func
. Это не. Он всегда вызывается с помощью одной строки или одного элемента итерируемого объекта. Отсюда мой ответ...
Хорошо, наконец-то понял, и это объясняет, почему проблема возникает, когда сама my_func выполняет внутреннюю итерацию и в данном случае по строке. Как насчет производительности? Мне придется проверить, так как я чувствую, что отправка каждой строки в процесс отдельно будет неэффективной?
Насколько я понимаю, если вы используете многопоточность (ThreadPool
), то внутренние очереди ввода и вывода являются экземплярами queue.SimpleQueue
, а не multiprocessing.queues.SimpleQueue
, и значение chunksize становится менее важным. При многопроцессорной обработке лучше выполнять меньше, хотя и больше, операций получения и помещения в эти очереди, и поэтому вам следует использовать явное разбиение на фрагменты при использовании imap
с большими итерациями (метод map
вычисляет размер фрагмента по умолчанию, если указан None
).
Постарайтесь сделать каждый фрагмент достаточно большим, чтобы работнику требовалось хотя бы несколько сот миллисекунд на каждый фрагмент, или просто попробуйте 1,10,100,1000, пока не увидите улучшения в количестве задач, выполняемых в секунду, вы никогда не найдете ни одного оптимального числа. в любом случае.
При вызове map
с указанным chunksize=None map
преобразует переданный итерируемый объект в список, если необходимо, чтобы получить его длину (iterable_size
), а затем вычисляет chunksize, remainder = divmod(iterable_size, 4 * pool_size)
. Тогда if remainder: chunksize += 1
. Если использовать imap
с многопроцессорной обработкой, я бы аппроксимировал размер итерации и использовал тот же расчет для вашего аргумента размера фрагмента. По сути, каждый процесс пула будет обрабатывать примерно 4 больших куска. Если вы используете многопоточность, посмотрите, имеет ли значение разделение на фрагменты.
imap
фрагментирование — это деталь реализации, ваша функция будет вызываться с одним элементом итерируемого объекта независимо от размера фрагмента.
for line in file:
result = my_func(line)
становится
for result in pool.imap(my_func, file, some_chunk_size):
pass
Синтаксически он такой же, как builtins.map без размера чанка, но люди его тоже не используют.
Размер фрагмента определяет, сколько элементов упаковывается в список перед их травлением и отправкой по каналу, процесс фрагментирования прозрачен для вашего кода, ваша функция всегда будет вызываться с помощью одной строки, независимо от размера фрагмента, вам следует поиграться с размером фрагмента и посмотрите, улучшит ли это производительность в вашем случае.
Обратите внимание, что imap
внутри имеет цикл for, поэтому он должен работать точно так же, как простой цикл for для итерируемого объекта.
Синтаксически он такой же, как map
, но люди его тоже не используют. Это так неправильно. Люди, конечно, его используют, и между map
и imap
огромная разница. Во-первых, map
принимает свой итерируемый аргумент (в данном случае file
), и если это не список, он преобразует его в список, как если бы вы указали pool.map(my_func, list(file)
. Таким образом, может быть большая разница, связанная с использованием памяти. Во-вторых, с помощью map
вы не получите результатов, пока все результаты не будут возвращены в виде списка. С помощью imap
вы можете перебирать результаты по мере того, как они возвращаются рабочей функцией my_func
.
При использовании imap
и imap_unordered
, поскольку размер итерируемого объекта обычно неизвестен без преобразования его в список, он использует размер фрагмента по умолчанию, равный 1. Для rmultiprocessing, в частности, при работе с большими итерируемыми объектами, вам следует указать размер фрагмента.
@Booboo, у меня builtins.map
ссылка, а не pool.map
... мне следует сделать это более явным в тексте.
Тем не менее, существует огромная разница между multiprocessing.pool.Pool.imap
и mutlirpcoessing.pool.Pool.map
. Зачем вам сравнивать imap
со встроенным map
? И люди используют встроенный map
.
Разбиение на части не так важно для multiprocessing.pool.ThreadPool
, потому что накладные расходы на помещение задач во внутреннюю очередь задач не так велики, как в случае многопроцессорной обработки, где лучше выполнять большее количество операций, но меньшее количество операций.
num_workers
будет вызываться для каждого элементаf
, который будет строкой. Я не могу воспроизвести вашу проблему. Вы нашли ошибку в Python? Какая у вас платформа и версия Python?