Приложение Node JS аварийно завершает работу с ошибкой ERR_SOCKET_CANNOT_SEND

У меня есть служба node js, которая принимает сообщения от Kafka и обрабатывает их на различных этапах логики преобразования. Во время обработки сервисы используют Redis и mongo для хранения и кеширования. В конце концов, он отправляет преобразованное сообщение другому месту назначения через UDP-пакеты.

При запуске он начинает получать сообщение от Kafka через некоторое время, он вылетает с необработанной ошибкой: ERR_CANNOT_SEND не может отправить данные (см. Рисунок ниже). перезапуск приложения временно решает проблему. Сначала я подумал, что это может быть связано с пересылкой через сокеты UDP, но места назначения пересылки доступны от потребителя!

Буду признателен за любую помощь здесь. Я как бы застрял здесь.

Приложение Node JS аварийно завершает работу с ошибкой ERR_SOCKET_CANNOT_SEND

Потребительский код:

const readFromKafka =  ({host, topic, source}, transformationService) => {
    const logger = createChildLogger(`kafka-consumer-${topic}`);
    const options = {
        // connect directly to kafka broker (instantiates a KafkaClient)
        kafkaHost: host,
        groupId: `${topic}-group`,
        protocol: ['roundrobin'], // and so on the  other kafka config.
    };

    logger.info(`starting kafka consumer on ${host} for ${topic}`);
    const consumer = new ConsumerGroup(options, [topic]);
    consumer.on('error', (err) => logger.error(err));
    consumer.on('message', async ({value, offset}) => {
        logger.info(`recieved ${topic}`, value);
        if (value) {
            const final = await transformationService([
                JSON.parse(Buffer.from(value, 'binary').toString()),
            ]);
            logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
         
        } else {
            logger.error(`invalid message: ${topic} ${value}`);
        }
        return;
    });
    consumer.on('rebalanced', () => {
        logger.info('cosumer is rebalancing');
    });
    return consumer;
};

Код запуска и обработки ошибок Consumer Service:

//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
    //initialize cache, configs.
}

//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
    //calls to fetch info like topic, transformationService etc.
   //readFromKafka function defn pasted above
    readFromKafka( {topicConfig}, transformationService);
};

init()
    .then(startConsumer)
    .catch((err) => {
        logger.error(err);
    });

Перенаправление кода через сокеты UDP. Следующий код периодически выдает необработанную ошибку, поскольку это, казалось, сработало для первых нескольких тысяч сообщений, а затем внезапно происходит сбой.

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({id, host, port}) => {
            return new Promise((resolve) => {
                dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

Здесь полный снимок в темноте, но это вопрос условий гонки? См. Этот комментарий github, чтобы узнать, о чем я думаю: github.com/nodejs/help/issues/2484#issuecomment-590944091 Я недостаточно знаю об инициализации сокета, но звучит так, как будто попытка отправить в сокет до того, как вы убедились, что сокет слушает, по крайней мере, вызывает вашу реальную проблему скрытый.

user3781737 05.04.2021 18:11

Я не уверен, что этот комментарий помогает моему делу или нет, поскольку я открываю отдельный сокет для каждого пункта назначения для отправки сообщения в пункт назначения, поэтому возможно, что в определенный момент может быть создано около 100 отдельных сокетов. , но каждый сокет будет привязан к случайному порту (поскольку я не называл socket.bind()).

Pancham Kumar 05.04.2021 20:59

Кроме того, должен вызываться обратный вызов ошибки, который, в свою очередь, разрешает обещание с регистрацией ошибок., Если я прав?

Pancham Kumar 06.04.2021 10:29

Более крупная трассировка стека в виде текста была бы более полезной, но если ваш обратный вызов ошибки работает правильно, то это должно быть то, что регистрирует SOCKET_CANNOT_SEND, который вы вообще видите в трассировке стека. Я хочу сказать, что если я читаю комментарий, который я связал правильно, когда вы вызываете socket.send на несвязанном сокете, Node будет неявно обрабатывать привязку за вас. Однако, если это ошибка связывания / отправки, по какой-либо внутренней причине обработчик неявной привязки скрывает реальную ошибку, стоящую за ошибкой SOCKET_CANNOT_SEND. Опять же, если я правильно читаю комментарий.

user3781737 06.04.2021 21:19

Вместо этого вы можете вместо этого изменить свой код UDP на что-то вроде var mySock = dgram.createSocket('udp4'); mySock.bind(port, host, () => { mySock.send(msg, 0, msg.length, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully', }); }); });, что может изменить ваше сообщение об ошибке на что-то более полезное и не столь двусмысленное, я надеюсь

user3781737 06.04.2021 21:27

К сожалению, добавленный мной моментальный снимок журналов - это все, что печатается на консоли сервера. В любом случае, я понимаю вашу точку зрения, проверю и обновлю ее. Благодарность

Pancham Kumar 06.04.2021 21:29

Исправление вышеизложенного, поскольку я неправильно понял роли клиента и сервера. Должно быть: var mySock = dgram.createSocket('udp4'); mySock.bind(0, "0.0.0.0", () => { mySock.send(msg, 0, msg.length, port, host, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully' }); }); }); Не уверен, что это действительно повлияет на сообщение об ошибке, но должно, по крайней мере, привести к тому же коду.

user3781737 06.04.2021 21:56

Поцарапайте все заранее, так как после правильного перечитывания вашего вопроса я теперь могу воспроизвести эту проблему после некоторого исследования и изменения моей версии node. Вы говорите, что это отправляет тысячи сообщений ... вы создаете новый сокет каждый раз, когда отправляете пакет сообщений? И вы обязательно очищаете и закрываете свои сокеты, когда они больше не нужны? Потому что я могу воспроизвести эту ошибку в узле v10, открыв невероятное количество (например, 2 ** 15) сокетов одновременно. В более поздних версиях узла вы получаете сообщение об ошибке, которое более четко указывает на отсутствие доступных портов / сокетов.

user3781737 07.04.2021 00:12

Я вызываю dgram.createSocket('udp4') для каждого назначения сообщения. Если этот вызов открывает сокет каждый раз, тогда да, я каждый раз открываю новый сокет.

Pancham Kumar 07.04.2021 08:34
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
9
69
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Основываясь на нашем обмене комментариями, я считаю, что проблема просто в том, что у вас не хватает ресурсов.

На протяжении всего жизненного цикла вашего приложения каждый раз, когда вы отправляете сообщение, вы открываете новый сокет. Однако после отправки этого сообщения вы не выполняете никакой очистки, и поэтому этот сокет остается открытым на неопределенный срок. Затем ваши открытые сокеты продолжают накапливаться, потребляя ресурсы, пока в конечном итоге у вас не закончится ... что-то. Возможно, память, возможно, порты, возможно, что-то еще, но в конечном итоге ваше приложение вылетает.

К счастью, решение не слишком запутанное: просто повторно используйте существующие сокеты. Фактически, вы можете просто повторно использовать один сокет для всего приложения, если хотите, так как внутри socket.send обрабатывает очереди за вас, поэтому нет необходимости делать какие-либо умные передачи. Однако, если вам нужно немного больше параллелизма, вот быстрая реализация очереди с циклическим перебором, где мы заранее создали пул из 10 сокетов, которые мы просто берем всякий раз, когда хотим отправить сообщение:

const MAX_CONCURRENT_SOCKETS = 10;

var rrIndex = 0;

const rrSocketPool = (() => {
    var arr = [];
    for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
        let sock = dgram.createSocket('udp4');
        arr.push(sock);
    }
    return arr;
})();

const udpSender = (msg, destinations) => {
    return Object.values(destinations)
        .map(({ id, host, port }) => {
            return new Promise((resolve) => {
                var sock = rrSocketPool[rrIndex];
                rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
                
                sock.send(msg, 0, msg.length, port, host, (err) => {
                    resolve({
                        id,
                        timestamp: Date.now(),
                        logs: err || 'Sent succesfully',
                    });
                });
            });
        });
};

Имейте в виду, что эта реализация все еще наивна по нескольким причинам, в основном потому, что до сих пор нет обработки ошибок в самих сокетах, только по их методу .send. Вы должны посмотреть документацию для получения дополнительной информации о перехвате событий, таких как события error, особенно если это производственный сервер, который должен работать бесконечно, но в основном обработка ошибок, которую вы поместили в свой обратный вызов .send, будет работать только ... если при вызове .send возникает ошибка. Если между отправкой сообщений, когда ваши сокеты простаивают, возникает некоторая ошибка системного уровня вне вашего контроля и приводит к поломке ваших сокетов, ваш сокет может затем выдать событие ошибки, которое не будет обработано (например, что происходит в вашей текущей реализации, с периодическими ошибками, которые вы видите до фатальной). В этот момент они могут быть навсегда непригодны для использования, что означает, что их следует заменить / восстановить или решить иным образом (или, в качестве альтернативы, просто принудительно перезапустите приложение и завершите его, как я :-)).

Цени его приятель.

Pancham Kumar 08.04.2021 21:55

Другие вопросы по теме