Запрос на удаление из очереди в служебной шине Azure, когда длина очереди <10, часто возвращает значение null

Я экспериментировал с очередями служебной шины Azure в NodeJS. Я создал sender.js и listener.js на основе их примеров кода в документации. Построение очереди работает нормально. Удаление и удаление сообщений из очереди работает нормально, пока длина сообщения не достигнет 10. На этом этапе запросы на удаление из очереди возвращают пустые сообщения примерно 4 из 5 раз. Если я продолжу зацикливать запросы на удаление из очереди, в конечном итоге он выведет из очереди и удалит эти последние 10 сообщений. Но это кажется крайне неэффективным. Кто-нибудь еще сталкивался с этой проблемой?

listener.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint"; // dev

console.info(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);
// var serviceBusService = azure.createServiceBusService();

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.info(error);
        if (!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if (!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if (!error){
            console.info(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.info(error, receivedMessage);
        // console.info(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.info(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.info('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.info("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

delayTimeIfQueueIsEmpty = 30000; // 30s
concurrency = 2;
queueName = "jobs";
// listen and dequeue message from azure message bus
listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName);

sender.js

var azure = require('azure');
var async = require("async");

var connectionString = process.env.CONNECTION_STRING || "Endpoint=sb://endpoint";

console.info(process.env.CONNECTION_STRING);

var serviceBusService = azure.createServiceBusService(connectionString);

exports.createQueue = function (req,res) {

    var body = req.body;

    serviceBusService.createQueueIfNotExists(body.queueName, function(error){
        console.info(error);
        if (!error){
            // Queue exists
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
};

exports.sendMessageToQueue = function (req, res) {
    var body = req.body;

    var message = {
        body: 'Test message',
        customProperties: {
            testproperty: 'TestValue'
        }};

    serviceBusService.sendQueueMessage(body.queueName, message, function(error){
        if (!error){
            // message sent
            return res.send(200);
        } else {
            return res.send(500, error);   
        }
    });
}

exports.receiveMessageFromQueue = function (req, res) {
    var body = req.body;

    serviceBusService.receiveQueueMessage(body.queueName, function(error, receivedMessage){
        if (!error){
            console.info(receivedMessage);

            // Message received and deleted
            return res.send(200,receivedMessage);
        }  else {
            return res.send(500, error);   
        }
    });
}

function _receiveMessageFromQueue(queueName,delayTimeIfQueueIsEmpty,callback) {
    serviceBusService.receiveQueueMessage(queueName, function(error, receivedMessage){
        console.info(error, receivedMessage);
        // console.info(error);
        if (error == 'No messages to receive') {
            // call the rest of the code and have it execute after 30 seconds
            setTimeout(function() {
                callback(receivedMessage);
            }, delayTimeIfQueueIsEmpty);
        } else {
            // callback immediately
            callback(receivedMessage);
        }
    });
}

function _sendQueueMessage(queueName,message,callback) {
    serviceBusService.sendQueueMessage(queueName, message, function(error){
        console.info(error);
        callback();
    });
}

function listenMessageQueue(concurrency,delayTimeIfQueueIsEmpty,queueName) {

    var taskHandler = function(task, done) {
        _receiveMessageFromQueue(task.queueName, delayTimeIfQueueIsEmpty, function(message) {
            if (message) {
                console.info('hello ' + message.body);
            }

            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});

            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.info("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

function pushMessageQueue(concurrency,queueName) {

    var taskHandler = function(task, done) {

        var message = {
            body: String(task.id),
            customProperties: {
                url: task.url
            }};

        _sendQueueMessage(task.queueName, message, function() {
            console.info('hello ' + task.id);
            myQueue.push({ id: task.id + 1, queueName: queueName, url: "http://localhost/get-person/" + task.id + 1});
            done();
        });
      };

    var queueSize = concurrency;

    var myQueue = async.queue(taskHandler, queueSize);

    myQueue.drain = function() {
        console.info("All the work has been done.");
    }

    for(var i = 0; i < concurrency; i++) {
        myQueue.push({ id: i, queueName: queueName, url: "http://localhost/get-person/"+i });
    }

}

concurrency = 2;
queueName = "jobs";
pushMessageQueue(concurrency,queueName); // push message to queue for testing: 100 messages per call
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
188
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

наконец-то удалось связаться со службой поддержки Azure и найти ответ. ServiceBus по умолчанию включает разбиение на разделы. При выполнении HTTP-запросов (пакет SDK NodeJS для Azure ServiceBus выполняет HTTP-вызовы REST), когда количество сообщений невелико, могут возникать разделы с разными наборами сообщений, поскольку у них не было возможности для синхронизации. Это решается путем создания новой очереди, которая отключает секционирование, или увеличения времени активности, или использования DotNet SDK, который позволяет делать запросы https.

Другие вопросы по теме