Я использую очереди из библиотеки многопроцессорности для обмена данными между процессами.
У меня 2 очереди, обе ограничены 10 объектами, в первой очереди есть один процесс, который «помещает» в нее объекты, и многие процессы «получают» от него.
Во второй очереди есть много процессов, которые «помещают» в нее объекты, и только один процесс «получает» от нее.
Система какое-то время работает отлично, а потом начинает вести себя странно: только процесс, который «помещает» объекты в первую очередь, продолжает работать, пока процессы, которые читают из первой очереди, по-видимому, больше не ведут себя / не работают (даже если процессы живы). Кажется, здесь тупик, но я не уверен, вот мой код:
import multiprocessing
import logging
from multiprocessing import Process
logger = logging.get_logger(__name__)
# Processes 2, 3 ,4:
class Processes_234(Process):
def __init__(self, message_queue_1, message_queue_2):
Process.__init__(self)
self.message_queue_1 = message_queue_1
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el3 = self.message_queue_1.get()
logger.debug('Processes234: get from queue')
except Exception as exp:
logger.debug("message_queue_1: queue empty, Exception message: " + str(exp))
# do some stuff with el1, el2, el3...
try:
# put into second queue
self.message_queue_2.put_nowait((el1, el2, el3))
logger.debug('Processes234: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_2: queue is full")
# the queue is full so replace the old element with the new one
try:
self.message_queue_2.get_nowait()
self.message_queue_2.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
# process 5:
class Process5(Process):
def __init__(self, message_queue_2):
Process.__init__(self)
self.message_queue_2 = message_queue_2
def run(self):
while True:
try:
# get from queue
el1, el2, el = self.message_queue_2.get()
print('Process5: get from queue')
except Exception as exp:
print("message_queue_2: queue empty, Exception message: " + str(exp))
def start_process_1():
# init queues
message_queue_1 = multiprocessing.Queue(maxsize=10)
message_queue_2 = multiprocessing.Queue(maxsize=10)
processes_234 = [Processes_234(message_queue_1, message_queue_2)
for _ in range(3)]
for proc in processes_234:
proc.start()
process5 = Process5(message_queue_2)
process5.start()
counter = 1
while True:
el1 = counter + 1
el2 = counter + counter
el3 = "some string " * ((counter ** 2) % 60000)
counter += 1
# start passing data
try:
# put into queue
message_queue_1.put_nowait((el1, el2, el3))
logger.debug('Process1: put into queue')
except Exception as excpt:
logger.debug(excpt)
logger.debug("message_queue_1: queue is full")
# the queue is full so replace the old element with the new one
try:
message_queue_1.get_nowait()
message_queue_1.put_nowait((el1, el2, el3))
# in case other process already fill the queue - ignore
except:
pass
if __name__ == '__main__':
start_process_1()
кто-нибудь знает, в чем моя проблема?
Я использую Python 3.6.5
спасибо, решил и обновил код по вашей просьбе тому, кто воспроизводит случай.






Наконец-то мне удалось решить проблему, это был регистратор! Согласно библиотеке протоколирования, регистратор является потокобезопасным, но не многопроцессорным.
Я изменил код, чтобы у каждого процесса был свой логгер, и это решило проблему.
Хороший улов и спасибо за обновление вашего вопроса. Таким образом, это может помочь другим решить аналогичную проблему.
Я не уверен, сколько времени мне понадобилось бы, чтобы решить эту проблему, если бы я не нашел это, спасибо!
Не вижу немедленных проблем с тем, что вы опубликовали, но опять же, трудно попытаться воспроизвести вашу проблему, потому что мы должны делать предположения о том, как выполняется код (как выглядят ваши объекты
Process*и ваш__main__). Создайте MCVE и также объясните, как именно вы определяете, что ваши объектыProcess_234перестали работать.