Тупик в очереди многопроцессорной обработки Python

Я использую очереди из библиотеки многопроцессорности для обмена данными между процессами.

У меня 2 очереди, обе ограничены 10 объектами, в первой очереди есть один процесс, который «помещает» в нее объекты, и многие процессы «получают» от него.

Во второй очереди есть много процессов, которые «помещают» в нее объекты, и только один процесс «получает» от нее.

Система какое-то время работает отлично, а потом начинает вести себя странно: только процесс, который «помещает» объекты в первую очередь, продолжает работать, пока процессы, которые читают из первой очереди, по-видимому, больше не ведут себя / не работают (даже если процессы живы). Кажется, здесь тупик, но я не уверен, вот мой код:

ОБНОВЛЕНО

import multiprocessing
import logging
from multiprocessing import Process

logger = logging.get_logger(__name__)

# Processes 2, 3 ,4:

class Processes_234(Process):
    def __init__(self, message_queue_1, message_queue_2):
        Process.__init__(self)
        self.message_queue_1 = message_queue_1
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el3 = self.message_queue_1.get()
                logger.debug('Processes234: get from queue')
            except Exception as exp:
                logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))

            # do some stuff with el1, el2, el3...

            try:
                # put into second queue
                self.message_queue_2.put_nowait((el1, el2, el3))
                logger.debug('Processes234: put into queue')
            except Exception as excpt:
                logger.debug(excpt)
                logger.debug("message_queue_2: queue is full")
                # the queue is full so replace the old element with the new one
                try:
                    self.message_queue_2.get_nowait()
                    self.message_queue_2.put_nowait((el1, el2, el3))
                    # in case other process already fill the queue - ignore
                except:
                    pass


# process 5:
class Process5(Process):
    def __init__(self, message_queue_2):
        Process.__init__(self)
        self.message_queue_2 = message_queue_2

    def run(self):
        while True:
            try:
                # get from queue
                el1, el2, el = self.message_queue_2.get()
                print('Process5: get from queue')

            except Exception as exp:
                print("message_queue_2: queue empty, Exception message: " + str(exp))


def start_process_1():
    # init queues
    message_queue_1 = multiprocessing.Queue(maxsize=10)
    message_queue_2 = multiprocessing.Queue(maxsize=10)

    processes_234 = [Processes_234(message_queue_1, message_queue_2)
                     for _ in range(3)]

    for proc in processes_234:
        proc.start()

    process5 = Process5(message_queue_2)
    process5.start()
    counter = 1

    while True:
        el1 = counter + 1
        el2 = counter + counter
        el3 = "some string " * ((counter ** 2) % 60000)
        counter += 1
        # start passing data
        try:

            # put into queue
            message_queue_1.put_nowait((el1, el2, el3))
            logger.debug('Process1: put into queue')
        except Exception as excpt:
            logger.debug(excpt)
            logger.debug("message_queue_1: queue is full")
            # the queue is full so replace the old element with the new one
            try:
                message_queue_1.get_nowait()
                message_queue_1.put_nowait((el1, el2, el3))
                # in case other process already fill the queue - ignore
            except:
                pass


if __name__ == '__main__':
    start_process_1()

кто-нибудь знает, в чем моя проблема?

Я использую Python 3.6.5

Не вижу немедленных проблем с тем, что вы опубликовали, но опять же, трудно попытаться воспроизвести вашу проблему, потому что мы должны делать предположения о том, как выполняется код (как выглядят ваши объекты Process* и ваш __main__). Создайте MCVE и также объясните, как именно вы определяете, что ваши объекты Process_234 перестали работать.

shmee 26.07.2018 13:42

спасибо, решил и обновил код по вашей просьбе тому, кто воспроизводит случай.

Yinon_90 07.08.2018 20:12
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
5
2
703
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Наконец-то мне удалось решить проблему, это был регистратор! Согласно библиотеке протоколирования, регистратор является потокобезопасным, но не многопроцессорным.

Я изменил код, чтобы у каждого процесса был свой логгер, и это решило проблему.

Хороший улов и спасибо за обновление вашего вопроса. Таким образом, это может помочь другим решить аналогичную проблему.

shmee 07.08.2018 23:41

Я не уверен, сколько времени мне понадобилось бы, чтобы решить эту проблему, если бы я не нашел это, спасибо!

Mazyod 08.05.2021 00:10

Другие вопросы по теме