Обрабатывать элементы порциями, используя многопроцессорные очереди

У меня есть многопроцессорная очередь; Конец очереди сигнализируется с помощью значения SENTINEL, строки.

aq =  Queue() 

................................

Экземпляр в очереди относится к классу A:

class A:
 id: str
 desc: str

В функции я получаю элементы из очереди aq и обрабатываю их кусками. Первый элемент (если он только один) может быть SENTINEL, обрабатывать нечего. ....

def process:
  chunk_data = []
  all = [
  item = aq.get()
  if not isinstance(item, A):
    return
  chunk_data.append(item.id)
  while item != SENTINEL:
   # start process in chunks
   # adding elements to the chunk list until is full
    while len(chunk_data) < CHUNK_MAX_SIZE: # 50
      item = aq.get()
      if item == SENTINEL:
        break
      chunk_data.append(item.id)
    # the chunk list is full start processing
    chunk_process_ids = process_data(chunk_data) # process chunks
    all.extend(chunk_process_ids)
    # empty chunk list and start again
    chunk_data.clear()  

Функция работает, как и ожидалось, но я считаю код запутанным. Я ищу простую, более понятную версию.

Эта функция не работает. Это даже синтаксически неправильно.

Kelly Bundy 13.04.2023 10:07
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
1
157
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Я бы предпочел структурировать код следующим образом:

def get_chunks():
    chunk_data = []
    while True:
        item = aq.get()
        if item == SENTINEL: # or: if not isinstance(item A):
            break
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a "small" chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    for chunk_data in get_chunks():
        all.extend(process_data(chunk_data))

Но "писателю" предпочтительнее было бы использовать get_chunks, чтобы в очередь записывались уже готовые чанки. Это привело бы к меньшему (но большему) обращению к очереди, что в целом было бы более эффективным.

Вот пример, в котором я предполагаю, что все экземпляры A находятся в списке, list_of_a_instances:

def get_chunks():
    chunk_data = []
    for item in list_of_a_instances: # a list of all the A instances, for example
        chunk_data.append(item.id)
        if len(chunk_data) == CHUNK_MAX_SIZE:
            yield chunk
            chunk_data = []
    # Do we have a "small" chunk?
    if chunk_data:
        yield chunk_data

def process():
    all = []
    while True:
        chunk_data = aq.get()
        if chunk_data == SENTINEL:
            break
        all.extend(process_data(chunk_data))

def writer():
    for chunk_data in get_chunks():
        aq.put(chunk_data)
    aq.put(SENTINEL)
Ответ принят как подходящий

В интересах следования принципу DRY вот то, что я считаю более чистой версией вашего кода без повторения логики. Обратите внимание, что обычно лучше обрабатывать ошибки, вызывая исключение, чем просто возвращать значение, когда тип входного значения не соответствует ожидаемому.

def process():
    all = []
    while True:
        chunk_data = []
        for _ in range(CHUNK_MAX_SIZE):
            if (item := aq.get()) == SENTINEL:
                break
            assert isinstance(item, A)
            chunk_data.append(item.id)
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) < CHUNK_MAX_SIZE:
            break

Вы также можете немного почистить код с помощью iter и itertools.islice, если вам не нужно проверять, относится ли каждый элемент к типу A (чего в любом случае не следует делать, если у вас есть контроль над код, который ставит элементы в очередь):

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    while True:
        chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
        if chunk_data:
            all.extend(process_data(chunk_data))
        if len(chunk_data) < CHUNK_MAX_SIZE:
            break

Кредит @KellyBundy для более краткой версии, как указано ниже:

from itertools import islice
from operator import attrgetter

def process():
    all = []
    data = iter(aq.get, SENTINEL)
    ids = map(attrgetter('id'), data)
    while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
        all += process_data(chunk_ids)

Немного дальше: код

Kelly Bundy 13.04.2023 10:31

Ааа.. простое упоминание itertools в любом месте поста вызывает iter-бога @KellyBundy :-D. Действительно, вся проверка len(chunk_data) < CHUNK_MAX_SIZE избыточна, когда мы используем islice. Остальные уборки тоже хороши. Спасибо!

blhsing 13.04.2023 10:35

Да, эти два отдельных чека "мы еще не закончили" меня раздражали :-)

Kelly Bundy 13.04.2023 10:38

«пустой список в качестве ввода» - этого не происходит. Если список пуст, цикл while завершается, он не передается их функции.

Kelly Bundy 13.04.2023 10:40

Вы могли бы сказать: «Если ваша функция process_data хорошо работает с кортежем в качестве входных данных, вы можете использовать рецепт batched ».

Kelly Bundy 13.04.2023 10:42

Ах, да, столько избыточной логики устранено с помощью выражения присваивания в качестве условия while.

blhsing 13.04.2023 10:42

Ах, действительно, хороший рецепт. Но мы можем просто передать islice конструктору list вместо tuple, верно?

blhsing 13.04.2023 10:45

(Но я не думаю, что batched здесь сильно поможет, даже если в Python 3.12 он будет использоваться как настоящий itertool, а не просто как рецепт.)

Kelly Bundy 13.04.2023 10:46

Вы имеете в виду изменить рецепт? Конечно. Но тогда вам действительно придется писать его самостоятельно, а не импортировать из more-itertools. И тогда я думаю, что действительно проще просто встроить его, как я это сделал.

Kelly Bundy 13.04.2023 10:49

@KellyBundy Верно. Теперь я понимаю вашу точку зрения ... только после того, как я изменил ваш код с помощью модифицированного рецепта. :-)

blhsing 13.04.2023 10:55
Модифицированный код
blhsing 13.04.2023 10:55

Хех, даже я бы не стал использовать здесь chain :-). Я бы использовал только for chunk_ids in batched(ids, CHUNK_MAX_SIZE): вместо while ...:.

Kelly Bundy 13.04.2023 10:59

Другие вопросы по теме