Как использовать многопоточность с pika и rabbitmq для выполнения запросов и ответов RPC-сообщений

Я работаю над проектом с Rabbitmq, я использую шаблон RPC, в основном я получаю или потребляю сообщения из очереди, выполняю некоторую обработку, а затем отправляю ответ обратно. Я использую Pika, моя цель - использовать поток для каждой задачи, поэтому для каждой задачи я создам поток специально для этой задачи. Я также читал, что лучше всего сделать только одно соединение и под ним столько каналов, сколько я хочу, но я всегда получаю эту ошибку:
'start_using нельзя вызывать из области действия ' pika.exceptions.RecursionError: start_using нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel.

Я провел небольшое исследование и обнаружил, что Pika не является потокобезопасным, и мы должны использовать для каждого потока независимое соединение и канал. но я не хочу этого делать, так как это считается плохой практикой. Поэтому я хотел спросить здесь, добился ли кто-то уже этой работы. Я также читал, что это возможно, если я не использовал BlockingConnection для создания экземпляра своего соединения, а также что есть функция с именем add_callback_threadsafe, которая может сделать это возможным. но, к сожалению, для этого нет примеров, и я читал документацию, но она сложная, и без примеров мне было трудно понять, что они хотят описать.

моя попытка состояла в том, чтобы объявить два класса. Каждый класс будет представлять собой исполнителя задач, который получает или потребляет сообщение из очереди и на основе этого выполняет некоторую обработку и возвращает ответ. моя идея состояла в том, чтобы разделить соединение rabbitmq между двумя задачами, но каждая задача получит независимый канал. в приведенном выше коде параметр Rabbit, переданный функции, представляет собой класс, который содержит некоторые переменные, такие как Connection и другие функции, такие как EventSubscriber, которые при вызове назначат новый канал и начнут потреблять сообщения от этого конкретного обмена и routingKey. Затем я объявляю поток и даю функцию подписки или потребления в качестве цели для этого потока. другой класс задач выглядит так же, как этот класс, поэтому я буду загружать только этот код. в основном классе я подключаюсь к rabbitmq и передаю его как параметр конструктору двух классов задач.

класс On_Deregistration:

def __init__(self, rabbit):
   self.event(rabbit) # this will call event function and pass the connection shared between all Tasks. rabbit parameter hold a connection to rabbitmq

def event(self, rabbit):
 
    self.Subscriber = rabbit.EventSubscriber(rabbit,  'testing.test',  'test', False,  onDeregistrationFromHRS # this func is task listener)

def subscribeAsync(self):
    self.Subscriber.subscribe() # here i call start_consuming

def start(self):
    """start Subscribtion in an Independant Thread  """
    thread = threading.Thread(target = self.subscribeAsync )  
    thread.start()
    if thread.isAlive():
        print("asynchronous subscription started")

ОСНОВНОЙ класс:

Приложение класса:

def __init__(self):

    self.rabbitMq = RabbitMqCommunicationInterface(host='localhost', port=5672)
    firstTask =  On_Deregistration(self.rabbitMq)
    secondTask =  secondTask(self.rabbitMq)

приложение = приложение()

ошибка: 'start_using не может быть вызван из области действия '

pika.exceptions.RecursionError: start_using нельзя вызывать из области действия другого обратного вызова BlockingConnection или BlockingChannel.

Я искал причину этой ошибки и, очевидно, pika не является потокобезопасным, но для этого должно быть решение. может быть, не используя BlockingConnection? может быть, кто-нибудь может дать мне пример, как это сделать, потому что я пробовал и не работал. Может быть, я что-то упускаю из того, как реализовать многопоточность с помощью rabbitmq.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
2 115
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

поэтому после долгих исследований я понял, что Pika не является потокобезопасным. ну, по крайней мере, на данный момент, возможно, в новых версиях это будет потокобезопасно. так что теперь для моего проекта я перестал использовать Pika и использую б-кролик, который является потокобезопасной оболочкой для Rabbitpy. но я должен сказать, что Pika — отличная библиотека, и я считаю, что API лучше описан и структурирован, чем rabbitpy, но для моего проекта было обязательным использовать многопоточность, и поэтому Pika на данный момент был плохим выбором. Я надеюсь, что это поможет кому-то в будущем

Другие вопросы по теме