Некоторое время я использую RabbitMQ. У меня есть несколько микросервисов, работающих с использованием базового механизма RPC, очень близкого к описанному в Уроки кролика. Я пытаюсь перейти на AMQP 1.0 с помощью rhea, потому что мне нужно использовать Amazon MQ. Однако я все еще пытаюсь воспроизвести этот простой шаблон:
ch.assertQueue('', {exclusive: true}, function(err, q) {
let corr = //some UUID
ch.consume(q.queue, function(msg) {
/* */
});
ch.sendToQueue('rpc_queue',
"TEST2",
{ correlationId: corr, replyTo: q.queue });
});
})
Чего я не получаю от rhea, так это возможности иметь временную очередь (связанную с подключением клиента) и нажимать "replyTo" для этих очередей.
Я пробовал:
client.open_receiver({
source: { address: "rpc:callback", expiry_policy: "connection-close" }
});
используя expiry_policy
, но он не работает. Я пробую даже RabbitMQ с плагином AMQP 1.0, а затем Apache ActiveMQ.
Дело в том, что я хотел бы ...
Однако я не могу ни получить временную очередь (кроме AMQP 0.9.1), ни использовать это имя для адресации ответа.
@vrachlin давным-давно я сделал то, что только что написал ниже
const container = require("rhea");
const _logger = require("pino")();
const nanoid = require("nanoid");
const init = ({ config, caller, resources, services, rpcs }) => {
return new Promise((resolve, reject) => {
let _rpcs = {};
let _responses = {};
const send = (sender, receiver, correlation_id, body) => {
if (receiver.source.address) {
sender.send({
reply_to: receiver.source.address,
correlation_id,
body
});
}
};
container.on("connection_open", context => {
//RPCS
rpcs &&
rpcs.forEach(sendTo => {
let parts = sendTo.name.split(".");
_rpcs[parts[0]] = _rpcs[parts[0]] ? _rpcs[parts[0]] : {};
let sender = context.connection.open_sender(sendTo.name);
let receiver = context.connection.open_receiver({
source: { dynamic: true }
});
receiver.on("message", context => {
let correlation_id = context.message.correlation_id;
if (_responses[correlation_id]) {
let { resolve, reject } = _responses[correlation_id];
resolve(context.message.body);
delete _responses[correlation_id];
}
});
_rpcs[parts[0]][parts[1]] = body =>
new Promise((resolve, reject) => {
const correlation_id = nanoid();
_responses[correlation_id] = { resolve, reject };
send(sender, receiver, correlation_id, body);
});
});
// SERVICES
services &&
services.forEach(service => {
let receiver = context.connection.open_receiver({
source: `${resources.name}.${service.name}`,
//credit_window: 1, //service.prefetch || 500,
autoaccept: false
});
receiver.on("message", async context => {
let request = context.message;
let reply_to = request.reply_to;
let payload = request.body;
try {
let response = {
to: reply_to,
body: await caller(service.responder)({ payload })
};
if (request.correlation_id) {
response.correlation_id = request.correlation_id;
}
context.connection.send(response);
context.delivery.accept();
} catch (error) {
_logger.error(error);
context.delivery.reject();
}
});
});
});
container.on("receiver_open", context => {
resolve(_rpcs);
});
container.on("connection_error", error => _logger.error(error));
container.connect(config.getResource("amqp"));
});
};
module.exports = { init };
По сути, все сводится к этому потоку. Читайте снизу вверх, так как это логический ход вещей. Вы начинаете с запуска вашего слушателя, который создаст динамическую очередь со случайным именем. Затем вы открываете отправителя и имя ответа в очередь от переданного context
.
// 'conn' is the rhea connection you have already created
new Promise((resolve, reject) => {
let replyToQueue;
conn.on('message', (context) => {
// you have received your message
resolve(context.message);
});
conn.once('sendable', (context) => {
// send a message with a reply_to header
context.sender.send({
reply_to: replyToQueue,
body: 'some message content'
});
});
conn.on('receiver_open', (context) => {
// capture the name of that dynamically named queue here
replyToQueue = context.receiver.source.address;
conn.open_sender('queue://send.to');
});
// start listening to a dynamically named temporary queue
conn.open_receiver({ source: { dynamic: true } });
}
Привет, ты решил это? можешь поделиться как?