Как обрабатывать поток данных с помощью tensorflow и fifoqueue

Я пытаюсь обрабатывать данные, которые постоянно поступают в нейронную сеть. Для оптимизации всего процесса у меня есть:

  • Поток, отвечающий за заполнение FIFOQueue.
  • Другой поток, который должен извлечь как можно больше элементов из очереди до порогового значения. То есть: min (queue.size (), MAX_BATCH_SIZE). MAX_BATCH_SIZE - это максимум, который я могу обработать одновременно.

Мне удалось заставить его работать, но это что-то элементарное, и мне интересно, есть ли лучшая реализация этого метода. Я сосредотачиваюсь на потребительском потоке очереди.

Определение операций:

self.queue = tf.FIFOQueue ...
self.input_dequeue_op = self.queue.dequeue_many(
                                tf.maximum(
                                    tf.constant(1), 
                                    tf.minimum(
                                        tf.constant(MAX_BATCH_SIZE), 
                                        self.queue.size()
                                    )
                                )
                            )

Потребительский поток:

with network.sess.as_default():
    while True:

            results = network.sess.run(self.input_dequeue_op)

По сути, я намереваюсь отправить в нейронную сеть максимальное количество элементов, которые в настоящее время находятся в очереди, до MAX_BATCH_SIZE (это максимум, который может быть обработан одновременно).

Операция получает максимальную реальную партию, которую я могу загрузить. Мне это кажется очень примитивной формой, я тоже пробовал использовать ее напрямую:

self.input_dequeue_op = self.queue.dequeue_up_to(tf.constant(MAX_BATCH_SIZE))

Но очередь блокируется на неопределенный срок, даже если элементы добавляются.

Есть ли другой способ сделать это лучше?

Можете ли вы рассказать о характере вашего приложения? Например, вы проводите логический вывод или хотите обучить модель?

kvish 01.11.2018 20:38

Вывод, модель уже обучена. Система действует как демон, постоянно ожидающий обработки изображений, но, поскольку он работает на графическом процессоре для оптимизации процесса, требуется самый большой пакет, который сеть может обрабатывать одновременно: max (1, min (q.size (), MAX_BATCH_SIZE)).

Adria Ciurana 02.11.2018 17:25

Извините за поздний ответ. Как-то пропустил уведомление. Вы думали об использовании QueueRunners и координаторов? Они намного лучше справляются с управлением различными потоками, поэтому никогда не возникает блокировок или исчерпания очередей.

kvish 06.11.2018 15:07

Это не проблема, я ценю вашу помощь. С одной стороны, система использует демон, который использует FIFOQueue (один поток), а с другой стороны, несколько потоков используются для заполнения очереди (по одному на запрос). Эти потоки также выполняют другие операции, и по этой причине используются стандарты Python. Можно ли это сделать с новым api tf.data?

Adria Ciurana 07.11.2018 09:48

существует метод parallel_interleave, который, возможно, может создать набор данных tf.data, извлекая данные из различных файлов, что похоже на то, что вы пытаетесь достичь здесь. Но я не уверен, насколько оптимизировано такое решение для вывода. Для обучения у вас будут большие разрозненные источники данных, и вам нужно будет загружать их все повторно.

kvish 07.11.2018 17:03
0
5
160
0

Другие вопросы по теме