Почему итерация разбивает строки моего текстового файла, а генератор — нет?

Для каждой строки текстового файла я хочу выполнить тяжелые вычисления. Количество строк может достигать миллионов, поэтому я использую многопроцессорность:

num_workers = 1
with open(my_file, 'r') as f:
    with multiprocessing.pool.ThreadPool(num_workers) as pool:    
        for data in pool.imap(my_func, f, 100):
            print(data)

Поэтому я тестирую в интерактивном режиме ThreadPool() (будет заменено в финальной версии). В документации map или imap говорится:

Этот метод разбивает итерируемый объект на несколько фрагментов, которые он передает в пул процессов как отдельные задачи.

Поскольку мой открытый файл является итерируемым (каждая итерация представляет собой строку), я ожидаю, что это сработает, но строки разбиваются посередине. Я сделал генератор, который возвращает строки как положено, но я хочу понять, зачем вообще нужен генератор. Почему фрагментирование не происходит на границах строк?

ОБНОВЛЯТЬ:

чтобы уточнить, использую ли я этот генератор:

def chunked_reader(file, chunk_size=100):
    with open(file, 'r') as f:
        chunk = []
        i = 0
        for line in f:            
            if i == chunk_size:
                yield chunk
                chunk = []
                i = 0
            i += 1            
            chunk.append(line)
    # yield final chunk
    yield chunk

расчет работает, возвращает ожидаемые значения. Если я использую файловый объект напрямую, я получаю ошибки, указывающие на то, что фрагментирование не разбивается на границы строк.

ОБНОВЛЕНИЕ 2:

он разбивает строку на каждый отдельный символ, а не построчно. Я также могу использовать другую среду с Python 3.9, и она будет иметь такое же поведение. Так было уже некоторое время, и кажется, что это «так, как задумано», но не очень интуитивно понятно.

ОБНОВЛЕНИЕ 3:

Чтобы прояснить отмеченное решение, я понял, что chunk_size отправит список данных для обработки в my_func. Внутренне my_func перебирает переданные данные. Но поскольку мое предположение неверно и каждая строка отправляется в my_func отдельно независимо от chunk_size, внутренняя итерация выполнялась по строке, а не по списку строк, как я ожидал.

num_workers будет вызываться для каждого элемента f, который будет строкой. Я не могу воспроизвести вашу проблему. Вы нашли ошибку в Python? Какая у вас платформа и версия Python?
Booboo 15.07.2024 22:03

Это Python 3.12.4 для Windows

beginner_ 16.07.2024 08:08

вы думаете, что функция вызывается с часком в качестве входных данных, поэтому вы перебираете входные данные внутри my_func. Это не. Он всегда вызывается с помощью одной строки или одного элемента итерируемого объекта. Отсюда мой ответ...

Ahmed AEK 16.07.2024 08:27

Хорошо, наконец-то понял, и это объясняет, почему проблема возникает, когда сама my_func выполняет внутреннюю итерацию и в данном случае по строке. Как насчет производительности? Мне придется проверить, так как я чувствую, что отправка каждой строки в процесс отдельно будет неэффективной?

beginner_ 16.07.2024 10:47

Насколько я понимаю, если вы используете многопоточность (ThreadPool), то внутренние очереди ввода и вывода являются экземплярами queue.SimpleQueue, а не multiprocessing.queues.SimpleQueue, и значение chunksize становится менее важным. При многопроцессорной обработке лучше выполнять меньше, хотя и больше, операций получения и помещения в эти очереди, и поэтому вам следует использовать явное разбиение на фрагменты при использовании imap с большими итерациями (метод map вычисляет размер фрагмента по умолчанию, если указан None).

Booboo 16.07.2024 12:34

Постарайтесь сделать каждый фрагмент достаточно большим, чтобы работнику требовалось хотя бы несколько сот миллисекунд на каждый фрагмент, или просто попробуйте 1,10,100,1000, пока не увидите улучшения в количестве задач, выполняемых в секунду, вы никогда не найдете ни одного оптимального числа. в любом случае.

Ahmed AEK 16.07.2024 12:38

При вызове map с указанным chunksize=None map преобразует переданный итерируемый объект в список, если необходимо, чтобы получить его длину (iterable_size), а затем вычисляет chunksize, remainder = divmod(iterable_size, 4 * pool_size) . Тогда if remainder: chunksize += 1. Если использовать imap с многопроцессорной обработкой, я бы аппроксимировал размер итерации и использовал тот же расчет для вашего аргумента размера фрагмента. По сути, каждый процесс пула будет обрабатывать примерно 4 больших куска. Если вы используете многопоточность, посмотрите, имеет ли значение разделение на фрагменты.

Booboo 16.07.2024 12:57
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
3
7
124
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

imap фрагментирование — это деталь реализации, ваша функция будет вызываться с одним элементом итерируемого объекта независимо от размера фрагмента.

for line in file:
    result = my_func(line)

становится

for result in pool.imap(my_func, file, some_chunk_size):
    pass

Синтаксически он такой же, как builtins.map без размера чанка, но люди его тоже не используют.

Размер фрагмента определяет, сколько элементов упаковывается в список перед их травлением и отправкой по каналу, процесс фрагментирования прозрачен для вашего кода, ваша функция всегда будет вызываться с помощью одной строки, независимо от размера фрагмента, вам следует поиграться с размером фрагмента и посмотрите, улучшит ли это производительность в вашем случае.

Обратите внимание, что imap внутри имеет цикл for, поэтому он должен работать точно так же, как простой цикл for для итерируемого объекта.

Синтаксически он такой же, как map, но люди его тоже не используют. Это так неправильно. Люди, конечно, его используют, и между map и imap огромная разница. Во-первых, map принимает свой итерируемый аргумент (в данном случае file), и если это не список, он преобразует его в список, как если бы вы указали pool.map(my_func, list(file). Таким образом, может быть большая разница, связанная с использованием памяти. Во-вторых, с помощью map вы не получите результатов, пока все результаты не будут возвращены в виде списка. С помощью imap вы можете перебирать результаты по мере того, как они возвращаются рабочей функцией my_func.

Booboo 15.07.2024 21:57

При использовании imap и imap_unordered, поскольку размер итерируемого объекта обычно неизвестен без преобразования его в список, он использует размер фрагмента по умолчанию, равный 1. Для rmultiprocessing, в частности, при работе с большими итерируемыми объектами, вам следует указать размер фрагмента.

Booboo 15.07.2024 21:59

@Booboo, у меня builtins.map ссылка, а не pool.map ... мне следует сделать это более явным в тексте.

Ahmed AEK 15.07.2024 21:59

Тем не менее, существует огромная разница между multiprocessing.pool.Pool.imap и mutlirpcoessing.pool.Pool.map. Зачем вам сравнивать imap со встроенным map? И люди используют встроенный map.

Booboo 15.07.2024 22:00

Разбиение на части не так важно для multiprocessing.pool.ThreadPool, потому что накладные расходы на помещение задач во внутреннюю очередь задач не так велики, как в случае многопроцессорной обработки, где лучше выполнять большее количество операций, но меньшее количество операций.

Booboo 15.07.2024 22:05

Другие вопросы по теме