Я пишу код, который должен иметь следующую структуру:
Может ли кто-нибудь дать мне код / псевдокод о том, как сделать что-то подобное. В основном от того, какой буфер мне следует использовать и как потоки взаимодействуют, чтобы все данные собирались и обрабатывались правильно.
Я читал о классах «Очередь» и «Потоковая обработка» в Python. Однако я до сих пор не уверен, как это сделать правильно. Я пробовал несколько простых примеров с объектом threading.Event (), однако результаты противоречивы.
Большое спасибо!





Если ваш рабочий может видеть, что сборщики собираются, то это так же просто, как проверка, пока потоки не завершены, ИЛИ очередь не пуста.
Если ваш рабочий не может видеть состояние сборщиков или даже их количество, вы можете заставить сборщики сообщать это состояние в самой очереди вместе с данными.
Представьте, что у объекта данных есть поле происхождения и логическое поле, которое нужно пометить как открытое или закрытое. Например, сборщик 1 отправит что-то вроде:
data-1-opened, data-1-opened ... data-1-closed.
Рабочий должен только поддерживать набор идентификаторов сборщиков, добавляя новые записи по мере их появления и удаляя их при закрытии. Таким образом, условием выхода является то, что набор пуст И очередь данных пуста.