Настройка задачи сельдерея с кешем памяти для видеокадров в качестве стратегии кольцевого буфера в Python

Я хочу построить конвейер многозадачной обработки на Сельдерей и хочу, чтобы несколько задач обрабатывали один и тот же видеофайл. Задачи должны делиться видеоданными. Поэтому не каждая задача должна декодировать и извлекать кадры из видеофайла. Видеоданные будут представлять собой список извлеченных кадров (не каждый кадр видео необходим).

Есть ли какое-нибудь решение для эффективного обмена этими кадрами? Задачи можно обрабатывать на разных узлах. Но я не хочу делиться данными по сети, как Memcached или Redis. Задача должна искать видеоданные в памяти / кеше, если ее нет, задача должна выдать другую задачу для загрузки видео и извлечения кадров в кеш.

(производитель и несколько потребителей для каждого видеофайла)

Таким образом, задачи на одном узле / машине могут совместно использовать кэшированные данные. Две задачи на разных узлах не имеют преимуществ при кэшировании.

Я не хочу кэшировать все извлеченное видео, должно быть какое-то циклическое кеширование буфера. Кэш для каждого видео имеет фиксированный размер, скажем, 100 кадров. Разрыв между самой быстрой и самой медленной задачей не может превышать 100 кадров. Всего в памяти / кеше всего 100 кадров.

Возникает два основных вопроса:

  1. Настройка задачи

    Задача A: извлечение кадров из видео (производитель в память / кеш)

    Задача B1: потребление кадров и выполнение фактической работы (обработка кадров)

    . .

    Задача Bn: потребление фреймов и выполнение фактической работы (обработка фреймов)

    A, B1 - Bn работают параллельно. Но тогда эти задачи должны выполняться на одном узле. Если таксы B распределяются на разных узлах, что-то должно порождать другую задачу A (по одной на каждом узле для декодирования и извлечения кадров). Какой подход вы здесь порекомендуете? Что было бы лучшим выбором?

  2. Кеш Python

    Существуют ли какие-либо кеш-библиотеки / реализации / решения, которые лучше всего подходят для моего варианта использования для кеширования больших данных на локальном компьютере с некоторой реализацией кольцевого буфера? Что-то вроде DiskCache, но с возможностью кэшировать только 100 кадров путем кольцевой буферизации.

Какие подходы и конструкции вы рекомендуете для реализации моего варианта использования? Я хотел бы придерживаться Celery для распределения задач.

вы хотите, чтобы медленные процессы отбрасывали кадры или вы хотите, чтобы быстро выполняющиеся процессы ожидали медленных?

Aaron 30.10.2018 17:27

второй. в примере задача A будет ждать самой медленной задачи B, когда кольцевой буфер заполнен (указатель чтения на 100 кадров от указателя записи задачи A)

Frey 30.10.2018 19:15
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
2
590
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Возможно, это показывает мое упрямство, но я всегда находил, что проекты вроде сельдерея, которые добавляют кучу сложности поверх многопроцессорной обработки (которая и так сложна), создают больше проблем, чем они того стоят. Также нет лучшей альтернативы использованию разделяемой памяти stdlib и мьютексов с точки зрения скорости и простоты.

Для вашего случая простым решением было бы просто использовать очередь FIFO для каждого процесса и помещать в каждый кадр от производителя. Это, естественно, привело бы к большому использованию памяти, если бы вы делали n копий каждого кадра для n потребителей, однако вы, вероятно, довольно легко могли бы придумать механизм, чтобы поместить сами кадры в multiprocessing.sharedctypes.Array и вместо этого передавать только индексы через очередь . Пока очереди ограничены по длине короче, чем длина буфера, вы должны быть ограничены от перезаписи кадра в буфере до тех пор, пока он не будет использован всеми потребителями. Без какой-либо синхронизации это будет пролетать мимо ваших штанов, но немного магии мьютекса определенно может сделать это очень надежным решением.

Например:

import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce

BUFSHAPE = (10,10,10) #10 10x10 images in buffer

class Worker(Process):
    def __init__(self, q_size, buffer, name=''):
        super().__init__()
        self.queue = Queue(q_size)
        self.buffer = buffer
        self.name = name

    def run(self,): #do work here
        #I hardcoded datatype here. you might need to communicate it to the child process
        buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
        buf_arr.shape = BUFSHAPE
        while True:
            item = self.queue.get()
            if item == 'done':
                print('child process: {} completed all frames'.format(self.name))
                return
            with self.buffer.get_lock(): #prevent writing while we're reading
                #slice the frame from the array uning the index that was sent
                frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
            #do some intense processing on `frame`
            sleep(np.random.rand())
            print('child process: {} completed frame: {}'.format(self.name, item))

def main():
    #creating shared array
    buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
    #make a numpy.array using that memory location to make it easy to stuff data into it
    buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
    buf_arr.shape = BUFSHAPE
    #create a list of workers
    workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
                      buffer, #pass in shared buffer array
                      str(i)) #numbered child processes
                      for i in range(5)] #5 workers

    for worker in workers: #start the workers
        worker.start()
    for i in range(100): #generate 100 random frames to send to workers
        #insert a frame into the buffer
        with buffer.get_lock(): #prevent reading while we're writing
            buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
        #send the frame number to each worker for processing. If the input queue is full, this will block until there's space
        # this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
        for worker in workers:
            worker.queue.put(i)
    #when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
    for worker in workers:
        worker.queue.put('done')
        worker.join()


if __name__ == "__main__":
    freeze_support()
    main()

РЕДАКТИРОВАТЬ

Какая-то ошибка нечеткости требует, чтобы очередь была на 2 кадра меньше, чем буфер, а не на 1 кадр меньше, чтобы предотвратить перезапись кадра раньше его времени.

EDIT2 - объяснение первого редактирования:

Причина появления len(q) = len(buf)-2 заключается в том, что q.get() вызывается до того, как мы получаем кадр из буфера, а сам кадр записывается до того, как мы попытаемся поместить индекс в очередь. Если разница в длине составляет всего 1, рабочий может вытащить индекс кадра из очереди, тогда производитель может увидеть, что он может отправить его в очередь сейчас и перейти к следующему кадру, прежде чем рабочий сможет прочитать этот кадр. сам. Есть много способов подойти к этому по-другому, что может позволить меньшему количеству процессов все время ждать друг друга, возможно, используя mp.Event.

Другие вопросы по теме