У меня есть многопроцессорная очередь; Конец очереди сигнализируется с помощью значения SENTINEL, строки.
aq = Queue()
................................
Экземпляр в очереди относится к классу A:
class A:
id: str
desc: str
В функции я получаю элементы из очереди aq
и обрабатываю их кусками.
Первый элемент (если он только один) может быть SENTINEL, обрабатывать нечего.
....
def process:
chunk_data = []
all = [
item = aq.get()
if not isinstance(item, A):
return
chunk_data.append(item.id)
while item != SENTINEL:
# start process in chunks
# adding elements to the chunk list until is full
while len(chunk_data) < CHUNK_MAX_SIZE: # 50
item = aq.get()
if item == SENTINEL:
break
chunk_data.append(item.id)
# the chunk list is full start processing
chunk_process_ids = process_data(chunk_data) # process chunks
all.extend(chunk_process_ids)
# empty chunk list and start again
chunk_data.clear()
Функция работает, как и ожидалось, но я считаю код запутанным. Я ищу простую, более понятную версию.
Я бы предпочел структурировать код следующим образом:
def get_chunks():
chunk_data = []
while True:
item = aq.get()
if item == SENTINEL: # or: if not isinstance(item A):
break
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
for chunk_data in get_chunks():
all.extend(process_data(chunk_data))
Но "писателю" предпочтительнее было бы использовать get_chunks
, чтобы в очередь записывались уже готовые чанки. Это привело бы к меньшему (но большему) обращению к очереди, что в целом было бы более эффективным.
Вот пример, в котором я предполагаю, что все экземпляры A
находятся в списке, list_of_a_instances
:
def get_chunks():
chunk_data = []
for item in list_of_a_instances: # a list of all the A instances, for example
chunk_data.append(item.id)
if len(chunk_data) == CHUNK_MAX_SIZE:
yield chunk
chunk_data = []
# Do we have a "small" chunk?
if chunk_data:
yield chunk_data
def process():
all = []
while True:
chunk_data = aq.get()
if chunk_data == SENTINEL:
break
all.extend(process_data(chunk_data))
def writer():
for chunk_data in get_chunks():
aq.put(chunk_data)
aq.put(SENTINEL)
В интересах следования принципу DRY вот то, что я считаю более чистой версией вашего кода без повторения логики. Обратите внимание, что обычно лучше обрабатывать ошибки, вызывая исключение, чем просто возвращать значение, когда тип входного значения не соответствует ожидаемому.
def process():
all = []
while True:
chunk_data = []
for _ in range(CHUNK_MAX_SIZE):
if (item := aq.get()) == SENTINEL:
break
assert isinstance(item, A)
chunk_data.append(item.id)
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
Вы также можете немного почистить код с помощью iter и itertools.islice, если вам не нужно проверять, относится ли каждый элемент к типу A
(чего в любом случае не следует делать, если у вас есть контроль над код, который ставит элементы в очередь):
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
while True:
chunk_data = list(map(attrgetter('id'), islice(data, CHUNK_MAX_SIZE)))
if chunk_data:
all.extend(process_data(chunk_data))
if len(chunk_data) < CHUNK_MAX_SIZE:
break
Кредит @KellyBundy для более краткой версии, как указано ниже:
from itertools import islice
from operator import attrgetter
def process():
all = []
data = iter(aq.get, SENTINEL)
ids = map(attrgetter('id'), data)
while chunk_ids := list(islice(ids, CHUNK_MAX_SIZE)):
all += process_data(chunk_ids)
Немного дальше: код
Ааа.. простое упоминание itertools
в любом месте поста вызывает iter
-бога @KellyBundy :-D. Действительно, вся проверка len(chunk_data) < CHUNK_MAX_SIZE
избыточна, когда мы используем islice
. Остальные уборки тоже хороши. Спасибо!
Да, эти два отдельных чека "мы еще не закончили" меня раздражали :-)
«пустой список в качестве ввода» - этого не происходит. Если список пуст, цикл while завершается, он не передается их функции.
Вы могли бы сказать: «Если ваша функция process_data хорошо работает с кортежем в качестве входных данных, вы можете использовать рецепт batched
».
Ах, да, столько избыточной логики устранено с помощью выражения присваивания в качестве условия while
.
Ах, действительно, хороший рецепт. Но мы можем просто передать islice
конструктору list
вместо tuple
, верно?
(Но я не думаю, что batched
здесь сильно поможет, даже если в Python 3.12 он будет использоваться как настоящий itertool, а не просто как рецепт.)
Вы имеете в виду изменить рецепт? Конечно. Но тогда вам действительно придется писать его самостоятельно, а не импортировать из more-itertools. И тогда я думаю, что действительно проще просто встроить его, как я это сделал.
@KellyBundy Верно. Теперь я понимаю вашу точку зрения ... только после того, как я изменил ваш код с помощью модифицированного рецепта. :-)
Хех, даже я бы не стал использовать здесь chain
:-). Я бы использовал только for chunk_ids in batched(ids, CHUNK_MAX_SIZE):
вместо while ...:
.
Эта функция не работает. Это даже синтаксически неправильно.