Я работаю над проектом с Rabbitmq, я использую шаблон RPC, в основном я получаю или потребляю сообщения из очереди, выполняю некоторую обработку, а затем отправляю ответ обратно. Я использую Pika, моя цель - использовать поток для каждой задачи, поэтому для каждой задачи я создам поток специально для этой задачи. Я также читал, что лучше всего сделать только одно соединение и под ним столько каналов, сколько я хочу, но я всегда получаю эту ошибку:
'start_using нельзя вызывать из области действия '
pika.exceptions.RecursionError: start_using нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel.
Я провел небольшое исследование и обнаружил, что Pika не является потокобезопасным, и мы должны использовать для каждого потока независимое соединение и канал. но я не хочу этого делать, так как это считается плохой практикой. Поэтому я хотел спросить здесь, добился ли кто-то уже этой работы. Я также читал, что это возможно, если я не использовал BlockingConnection для создания экземпляра своего соединения, а также что есть функция с именем add_callback_threadsafe, которая может сделать это возможным. но, к сожалению, для этого нет примеров, и я читал документацию, но она сложная, и без примеров мне было трудно понять, что они хотят описать.
моя попытка состояла в том, чтобы объявить два класса. Каждый класс будет представлять собой исполнителя задач, который получает или потребляет сообщение из очереди и на основе этого выполняет некоторую обработку и возвращает ответ. моя идея состояла в том, чтобы разделить соединение rabbitmq между двумя задачами, но каждая задача получит независимый канал. в приведенном выше коде параметр Rabbit, переданный функции, представляет собой класс, который содержит некоторые переменные, такие как Connection и другие функции, такие как EventSubscriber, которые при вызове назначат новый канал и начнут потреблять сообщения от этого конкретного обмена и routingKey. Затем я объявляю поток и даю функцию подписки или потребления в качестве цели для этого потока. другой класс задач выглядит так же, как этот класс, поэтому я буду загружать только этот код. в основном классе я подключаюсь к rabbitmq и передаю его как параметр конструктору двух классов задач.
класс On_Deregistration:
def __init__(self, rabbit):
self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq
def event(self, rabbit):
self.Subscriber = rabbit.EventSubscriber(rabbit, 'testing.test', 'test', False, onDeregistrationFromHRS # this func is task listener)
def subscribeAsync(self):
self.Subscriber.subscribe() # here i call start_consuming
def start(self):
"""start Subscribtion in an Independant Thread """
thread = threading.Thread(target = self.subscribeAsync )
thread.start()
if thread.isAlive():
print("asynchronous subscription started")
Приложение класса:
def __init__(self):
self.rabbitMq = RabbitMqCommunicationInterface(host='localhost', port=5672)
firstTask = On_Deregistration(self.rabbitMq)
secondTask = secondTask(self.rabbitMq)
приложение = приложение()
pika.exceptions.RecursionError: start_using нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel.
Я искал причину этой ошибки и, очевидно, pika не является потокобезопасным, но для этого должно быть решение. может быть, не используя BlockingConnection? может быть, кто-нибудь может дать мне пример, как это сделать, потому что я пробовал и не работал. Может быть, я что-то упускаю из того, как реализовать многопоточность с помощью rabbitmq.
поэтому после долгих исследований я понял, что Pika не является потокобезопасным. ну, по крайней мере, на данный момент, возможно, в новых версиях это будет потокобезопасно. так что теперь для моего проекта я перестал использовать Pika и использую б-кролик, который является потокобезопасной оболочкой для Rabbitpy. но я должен сказать, что Pika — отличная библиотека, и я считаю, что API лучше описан и структурирован, чем rabbitpy, но для моего проекта было обязательным использовать многопоточность, и поэтому Pika на данный момент был плохим выбором. Я надеюсь, что это поможет кому-то в будущем