Я экспериментировал с очередями служебной шины Azure в NodeJS. Я создал sender.js и listener.js на основе их примеров кода в документации. Построение очереди работает нормально. Удаление и удаление сообщений из очереди работает нормально, пока длина сообщения не достигнет 10. На этом этапе запросы на удаление из очереди возвращают пустые сообщения примерно 4 из 5 раз. Если я продолжу зацикливать запросы на удаление из очереди, в конечном итоге он выведет из очереди и удалит эти последние 10 сообщений. Но это кажется крайне неэффективным. Кто-нибудь еще сталкивался с этой проблемой?
listener.js
var azure = require('azure');
var async = require("async");
var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint"; // dev
console.info(process.env.CONNECTION_STRING);
var serviceBusService = azure.createServiceBusService(connectionString);
// var serviceBusService = azure.createServiceBusService();
exports.createQueue = function (req,res) {
var body = req.body;
serviceBusService.createQueueIfNotExists(body.queueName, function(error){
console.info(error);
if (!error){
// Queue exists
return res.send(200);
} else {
return res.send(500, error);
}
});
};
exports.sendMessageToQueue = function (req, res) {
var body = req.body;
var message = {
body: 'Test message',
customProperties: {
testproperty: 'TestValue'
}};
serviceBusService.sendQueueMessage(body.queueName, message, function(error){
if (!error){
// message sent
return res.send(200);
} else {
return res.send(500, error);
}
});
}
exports.receiveMessageFromQueue = function (req, res) {
var body = req.body;
serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
if (!error){
console.info(receivedMessage);
// Message received and deleted
return res.send(200,receivedMessage);
} else {
return res.send(500, error);
}
});
}
function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
console.info(error, receivedMessage);
// console.info(error);
if (error == 'No messages to receive') {
// call the rest of the code and have it execute after 30 seconds
setTimeout(function() {
callback(receivedMessage);
}, delayTimeIfQueueIsEmpty);
} else {
// callback immediately
callback(receivedMessage);
}
});
}
function _sendQueueMessage(queueName,message,callback) {
serviceBusService.sendQueueMessage(queueName, message, function(error){
console.info(error);
callback();
});
}
function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {
var taskHandler = function(task, done) {
_receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
if (message) {
console.info('hello ' + message.body);
}
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.info("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
delayTimeIfQueueIsEmpty = 30000; // 30s
concurrency = 2;
queueName = "jobs";
// listen and dequeue message from azure message bus
listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName);
sender.js
var azure = require('azure');
var async = require("async");
var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint";
console.info(process.env.CONNECTION_STRING);
var serviceBusService = azure.createServiceBusService(connectionString);
exports.createQueue = function (req,res) {
var body = req.body;
serviceBusService.createQueueIfNotExists(body.queueName, function(error){
console.info(error);
if (!error){
// Queue exists
return res.send(200);
} else {
return res.send(500, error);
}
});
};
exports.sendMessageToQueue = function (req, res) {
var body = req.body;
var message = {
body: 'Test message',
customProperties: {
testproperty: 'TestValue'
}};
serviceBusService.sendQueueMessage(body.queueName, message, function(error){
if (!error){
// message sent
return res.send(200);
} else {
return res.send(500, error);
}
});
}
exports.receiveMessageFromQueue = function (req, res) {
var body = req.body;
serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
if (!error){
console.info(receivedMessage);
// Message received and deleted
return res.send(200,receivedMessage);
} else {
return res.send(500, error);
}
});
}
function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
console.info(error, receivedMessage);
// console.info(error);
if (error == 'No messages to receive') {
// call the rest of the code and have it execute after 30 seconds
setTimeout(function() {
callback(receivedMessage);
}, delayTimeIfQueueIsEmpty);
} else {
// callback immediately
callback(receivedMessage);
}
});
}
function _sendQueueMessage(queueName,message,callback) {
serviceBusService.sendQueueMessage(queueName, message, function(error){
console.info(error);
callback();
});
}
function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {
var taskHandler = function(task, done) {
_receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
if (message) {
console.info('hello ' + message.body);
}
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.info("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
function pushMessageQueue(concurrency,queueName) {
var taskHandler = function(task, done) {
var message = {
body: String(task.id),
customProperties: {
url: task.url
}};
_sendQueueMessage(task.queueName, message, function() {
console.info('hello ' + task.id);
myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
done();
});
};
var queueSize = concurrency;
var myQueue = async.queue(taskHandler, queueSize);
myQueue.drain = function() {
console.info("All the work has been done.");
}
for(var i = 0; i < concurrency; i++) {
myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
}
}
concurrency = 2;
queueName = "jobs";
pushMessageQueue(concurrency,queueName); // push message to queue for testing: 100 messages per call





наконец-то удалось связаться со службой поддержки Azure и найти ответ. ServiceBus по умолчанию включает разбиение на разделы. При выполнении HTTP-запросов (пакет SDK NodeJS для Azure ServiceBus выполняет HTTP-вызовы REST), когда количество сообщений невелико, могут возникать разделы с разными наборами сообщений, поскольку у них не было возможности для синхронизации. Это решается путем создания новой очереди, которая отключает секционирование, или увеличения времени активности, или использования DotNet SDK, который позволяет делать запросы https.