Я хочу построить конвейер многозадачной обработки на Сельдерей и хочу, чтобы несколько задач обрабатывали один и тот же видеофайл. Задачи должны делиться видеоданными. Поэтому не каждая задача должна декодировать и извлекать кадры из видеофайла. Видеоданные будут представлять собой список извлеченных кадров (не каждый кадр видео необходим).
Есть ли какое-нибудь решение для эффективного обмена этими кадрами? Задачи можно обрабатывать на разных узлах. Но я не хочу делиться данными по сети, как Memcached или Redis. Задача должна искать видеоданные в памяти / кеше, если ее нет, задача должна выдать другую задачу для загрузки видео и извлечения кадров в кеш.
(производитель и несколько потребителей для каждого видеофайла)
Таким образом, задачи на одном узле / машине могут совместно использовать кэшированные данные. Две задачи на разных узлах не имеют преимуществ при кэшировании.
Я не хочу кэшировать все извлеченное видео, должно быть какое-то циклическое кеширование буфера. Кэш для каждого видео имеет фиксированный размер, скажем, 100 кадров. Разрыв между самой быстрой и самой медленной задачей не может превышать 100 кадров. Всего в памяти / кеше всего 100 кадров.
Возникает два основных вопроса:
Настройка задачи
Задача A: извлечение кадров из видео (производитель в память / кеш)
Задача B1: потребление кадров и выполнение фактической работы (обработка кадров)
. .
Задача Bn: потребление фреймов и выполнение фактической работы (обработка фреймов)
A, B1 - Bn работают параллельно. Но тогда эти задачи должны выполняться на одном узле. Если таксы B распределяются на разных узлах, что-то должно порождать другую задачу A (по одной на каждом узле для декодирования и извлечения кадров). Какой подход вы здесь порекомендуете? Что было бы лучшим выбором?
Кеш Python
Существуют ли какие-либо кеш-библиотеки / реализации / решения, которые лучше всего подходят для моего варианта использования для кеширования больших данных на локальном компьютере с некоторой реализацией кольцевого буфера? Что-то вроде DiskCache, но с возможностью кэшировать только 100 кадров путем кольцевой буферизации.
Какие подходы и конструкции вы рекомендуете для реализации моего варианта использования? Я хотел бы придерживаться Celery для распределения задач.
второй. в примере задача A будет ждать самой медленной задачи B, когда кольцевой буфер заполнен (указатель чтения на 100 кадров от указателя записи задачи A)
Возможно, это показывает мое упрямство, но я всегда находил, что проекты вроде сельдерея, которые добавляют кучу сложности поверх многопроцессорной обработки (которая и так сложна), создают больше проблем, чем они того стоят. Также нет лучшей альтернативы использованию разделяемой памяти stdlib и мьютексов с точки зрения скорости и простоты.
Для вашего случая простым решением было бы просто использовать очередь FIFO для каждого процесса и помещать в каждый кадр от производителя. Это, естественно, привело бы к большому использованию памяти, если бы вы делали n копий каждого кадра для n потребителей, однако вы, вероятно, довольно легко могли бы придумать механизм, чтобы поместить сами кадры в multiprocessing.sharedctypes.Array
и вместо этого передавать только индексы через очередь . Пока очереди ограничены по длине короче, чем длина буфера, вы должны быть ограничены от перезаписи кадра в буфере до тех пор, пока он не будет использован всеми потребителями. Без какой-либо синхронизации это будет пролетать мимо ваших штанов, но немного магии мьютекса определенно может сделать это очень надежным решением.
Например:
import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce
BUFSHAPE = (10,10,10) #10 10x10 images in buffer
class Worker(Process):
def __init__(self, q_size, buffer, name=''):
super().__init__()
self.queue = Queue(q_size)
self.buffer = buffer
self.name = name
def run(self,): #do work here
#I hardcoded datatype here. you might need to communicate it to the child process
buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
while True:
item = self.queue.get()
if item == 'done':
print('child process: {} completed all frames'.format(self.name))
return
with self.buffer.get_lock(): #prevent writing while we're reading
#slice the frame from the array uning the index that was sent
frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
#do some intense processing on `frame`
sleep(np.random.rand())
print('child process: {} completed frame: {}'.format(self.name, item))
def main():
#creating shared array
buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
#make a numpy.array using that memory location to make it easy to stuff data into it
buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
buf_arr.shape = BUFSHAPE
#create a list of workers
workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
buffer, #pass in shared buffer array
str(i)) #numbered child processes
for i in range(5)] #5 workers
for worker in workers: #start the workers
worker.start()
for i in range(100): #generate 100 random frames to send to workers
#insert a frame into the buffer
with buffer.get_lock(): #prevent reading while we're writing
buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
#send the frame number to each worker for processing. If the input queue is full, this will block until there's space
# this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
for worker in workers:
worker.queue.put(i)
#when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
for worker in workers:
worker.queue.put('done')
worker.join()
if __name__ == "__main__":
freeze_support()
main()
РЕДАКТИРОВАТЬ
Какая-то ошибка нечеткости требует, чтобы очередь была на 2 кадра меньше, чем буфер, а не на 1 кадр меньше, чтобы предотвратить перезапись кадра раньше его времени.
EDIT2 - объяснение первого редактирования:
Причина появления len(q) = len(buf)-2
заключается в том, что q.get()
вызывается до того, как мы получаем кадр из буфера, а сам кадр записывается до того, как мы попытаемся поместить индекс в очередь. Если разница в длине составляет всего 1, рабочий может вытащить индекс кадра из очереди, тогда производитель может увидеть, что он может отправить его в очередь сейчас и перейти к следующему кадру, прежде чем рабочий сможет прочитать этот кадр. сам. Есть много способов подойти к этому по-другому, что может позволить меньшему количеству процессов все время ждать друг друга, возможно, используя mp.Event
.
вы хотите, чтобы медленные процессы отбрасывали кадры или вы хотите, чтобы быстро выполняющиеся процессы ожидали медленных?