Я пытаюсь использовать сообщения RabbitMQ на С# с помощью библиотеки RabbitMQ.Client, получить список сообщений и добавить его в массив перед возвратом массива. После этого я надеялся обработать этот массив и отправить его в новую очередь. Итак, мой первый подход ниже:
public async Task<List<string>> ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
var messages= new List<string>();
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToString();
// Do something with the to the body...
messages.Add(body);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
return messages;
}
Кажется, что возвращаемый массив сообщений всегда пуст. Может ли кто-нибудь помочь мне понять, почему, пожалуйста?
Другой подход, который я пробовал, заключается в том, чтобы сделать это в существующем обработчике Received:
public async Task ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToString();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "NewQueue1",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var newMsgBytes[] = new Byte[] // Do some processing of message and send new message to new queue
channel.BasicPublish(exchange: string.Empty,
routingKey: "NewQueue1",
basicProperties: null,
body: newMsgBytes);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
}
Это также не работает, поскольку новые сообщения не отправляются в очередь. Любые предложения, пожалуйста? Спасибо





Попробуйте прочитать так (вместо ea.Body.ToString() используйте ea.Body.ToArray())
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
Подробнее здесь https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
Спасибо. Мне нужно объявить очередь, как я сделал, дважды? «myqueue» — это очередь, которую я использую, а «NewQueue1» — это та, в которую я отправляю? Может быть, я ошибаюсь и не нужно устанавливать второе объявление, только ключ маршрутизации для basicPublish? Извините - я новичок в этом.
В вашем первом примере вы ничего не добавляете в список messages (который возвращается), только receivedQMessages. Если вы не изменяете messages в другом месте, возможно, именно поэтому вы ничего не видите «в очереди».
Во-вторых, обратите внимание, что если вы установите для autoAck значение true, сообщение будет удалено, как только вы его получите.
вы должны преобразовать тело eventArgs в массив Это простой потребитель:
var factory = new ConnectionFactory {
HostName = "localhost"
};
//Create the RabbitMQ connection using connection factory details
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//declare the queue after mentioning name and a few property related to that
channel.QueueDeclare("myqueue", exclusive: false);
//Set Event object which listen message from chanel which is sent by producer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) => {
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($ "myqueue message received: {message}");
};
//read the message
channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);
В первом подходе:
Событие Received является асинхронным и выполняется в отдельном потоке. Это означает, что ваш код будет продолжать выполняться до того, как событие Received завершит добавление сообщений в список messages. В результате список messages всегда будет пустым при возврате.
Используйте потокобезопасную коллекцию, например ConcurrentQueue<T>, для хранения сообщений.
так:
public async Task<List<string>> ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var messages = new ConcurrentQueue<string>();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Do something with the message...
messages.Enqueue(message);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
return messages.ToList();
}
Просто наблюдение - вам не нужно использовать
asyncчастьconsumer.Received += async, так как вы не ожидаете никаких асинхронных операций.