У меня есть служба node js, которая принимает сообщения от Kafka и обрабатывает их на различных этапах логики преобразования. Во время обработки сервисы используют Redis и mongo для хранения и кеширования. В конце концов, он отправляет преобразованное сообщение другому месту назначения через UDP-пакеты.
При запуске он начинает получать сообщение от Kafka через некоторое время, он вылетает с необработанной ошибкой: ERR_CANNOT_SEND не может отправить данные (см. Рисунок ниже). перезапуск приложения временно решает проблему. Сначала я подумал, что это может быть связано с пересылкой через сокеты UDP, но места назначения пересылки доступны от потребителя!
Буду признателен за любую помощь здесь. Я как бы застрял здесь.
Потребительский код:
const readFromKafka = ({host, topic, source}, transformationService) => {
const logger = createChildLogger(`kafka-consumer-${topic}`);
const options = {
// connect directly to kafka broker (instantiates a KafkaClient)
kafkaHost: host,
groupId: `${topic}-group`,
protocol: ['roundrobin'], // and so on the other kafka config.
};
logger.info(`starting kafka consumer on ${host} for ${topic}`);
const consumer = new ConsumerGroup(options, [topic]);
consumer.on('error', (err) => logger.error(err));
consumer.on('message', async ({value, offset}) => {
logger.info(`recieved ${topic}`, value);
if (value) {
const final = await transformationService([
JSON.parse(Buffer.from(value, 'binary').toString()),
]);
logger.info('Message recieved', {instanceID: final[0].instanceId, trace: final[1]});
} else {
logger.error(`invalid message: ${topic} ${value}`);
}
return;
});
consumer.on('rebalanced', () => {
logger.info('cosumer is rebalancing');
});
return consumer;
};
Код запуска и обработки ошибок Consumer Service:
//init is the async function used to initialise the cache and other config and components.
const init = async() =>{
//initialize cache, configs.
}
//startConsumer is the async function that connects to Kafka,
//and add a callback for the onMessage listener which processes the message through the transformation service.
const startConsumer = async ({ ...config}) => {
//calls to fetch info like topic, transformationService etc.
//readFromKafka function defn pasted above
readFromKafka( {topicConfig}, transformationService);
};
init()
.then(startConsumer)
.catch((err) => {
logger.error(err);
});
Перенаправление кода через сокеты UDP. Следующий код периодически выдает необработанную ошибку, поскольку это, казалось, сработало для первых нескольких тысяч сообщений, а затем внезапно происходит сбой.
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({id, host, port}) => {
return new Promise((resolve) => {
dgram.createSocket('udp4').send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
Я не уверен, что этот комментарий помогает моему делу или нет, поскольку я открываю отдельный сокет для каждого пункта назначения для отправки сообщения в пункт назначения, поэтому возможно, что в определенный момент может быть создано около 100 отдельных сокетов. , но каждый сокет будет привязан к случайному порту (поскольку я не называл socket.bind()).
Кроме того, должен вызываться обратный вызов ошибки, который, в свою очередь, разрешает обещание с регистрацией ошибок., Если я прав?
Более крупная трассировка стека в виде текста была бы более полезной, но если ваш обратный вызов ошибки работает правильно, то это должно быть то, что регистрирует SOCKET_CANNOT_SEND, который вы вообще видите в трассировке стека. Я хочу сказать, что если я читаю комментарий, который я связал правильно, когда вы вызываете socket.send на несвязанном сокете, Node будет неявно обрабатывать привязку за вас. Однако, если это ошибка связывания / отправки, по какой-либо внутренней причине обработчик неявной привязки скрывает реальную ошибку, стоящую за ошибкой SOCKET_CANNOT_SEND. Опять же, если я правильно читаю комментарий.
Вместо этого вы можете вместо этого изменить свой код UDP на что-то вроде var mySock = dgram.createSocket('udp4'); mySock.bind(port, host, () => { mySock.send(msg, 0, msg.length, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully', }); }); });, что может изменить ваше сообщение об ошибке на что-то более полезное и не столь двусмысленное, я надеюсь
К сожалению, добавленный мной моментальный снимок журналов - это все, что печатается на консоли сервера. В любом случае, я понимаю вашу точку зрения, проверю и обновлю ее. Благодарность
Исправление вышеизложенного, поскольку я неправильно понял роли клиента и сервера. Должно быть: var mySock = dgram.createSocket('udp4'); mySock.bind(0, "0.0.0.0", () => { mySock.send(msg, 0, msg.length, port, host, (err) => { resolve({ id, timestamp: Date.now(), logs: err || 'Sent succesfully' }); }); }); Не уверен, что это действительно повлияет на сообщение об ошибке, но должно, по крайней мере, привести к тому же коду.
Поцарапайте все заранее, так как после правильного перечитывания вашего вопроса я теперь могу воспроизвести эту проблему после некоторого исследования и изменения моей версии node. Вы говорите, что это отправляет тысячи сообщений ... вы создаете новый сокет каждый раз, когда отправляете пакет сообщений? И вы обязательно очищаете и закрываете свои сокеты, когда они больше не нужны? Потому что я могу воспроизвести эту ошибку в узле v10, открыв невероятное количество (например, 2 ** 15) сокетов одновременно. В более поздних версиях узла вы получаете сообщение об ошибке, которое более четко указывает на отсутствие доступных портов / сокетов.
Я вызываю dgram.createSocket('udp4') для каждого назначения сообщения. Если этот вызов открывает сокет каждый раз, тогда да, я каждый раз открываю новый сокет.





Основываясь на нашем обмене комментариями, я считаю, что проблема просто в том, что у вас не хватает ресурсов.
На протяжении всего жизненного цикла вашего приложения каждый раз, когда вы отправляете сообщение, вы открываете новый сокет. Однако после отправки этого сообщения вы не выполняете никакой очистки, и поэтому этот сокет остается открытым на неопределенный срок. Затем ваши открытые сокеты продолжают накапливаться, потребляя ресурсы, пока в конечном итоге у вас не закончится ... что-то. Возможно, память, возможно, порты, возможно, что-то еще, но в конечном итоге ваше приложение вылетает.
К счастью, решение не слишком запутанное: просто повторно используйте существующие сокеты. Фактически, вы можете просто повторно использовать один сокет для всего приложения, если хотите, так как внутри socket.send обрабатывает очереди за вас, поэтому нет необходимости делать какие-либо умные передачи. Однако, если вам нужно немного больше параллелизма, вот быстрая реализация очереди с циклическим перебором, где мы заранее создали пул из 10 сокетов, которые мы просто берем всякий раз, когда хотим отправить сообщение:
const MAX_CONCURRENT_SOCKETS = 10;
var rrIndex = 0;
const rrSocketPool = (() => {
var arr = [];
for (let i = 0; i < MAX_CONCURRENT_SOCKETS; i++) {
let sock = dgram.createSocket('udp4');
arr.push(sock);
}
return arr;
})();
const udpSender = (msg, destinations) => {
return Object.values(destinations)
.map(({ id, host, port }) => {
return new Promise((resolve) => {
var sock = rrSocketPool[rrIndex];
rrIndex = (rrIndex + 1) % MAX_CONCURRENT_SOCKETS;
sock.send(msg, 0, msg.length, port, host, (err) => {
resolve({
id,
timestamp: Date.now(),
logs: err || 'Sent succesfully',
});
});
});
});
};
Имейте в виду, что эта реализация все еще наивна по нескольким причинам, в основном потому, что до сих пор нет обработки ошибок в самих сокетах, только по их методу .send. Вы должны посмотреть документацию для получения дополнительной информации о перехвате событий, таких как события error, особенно если это производственный сервер, который должен работать бесконечно, но в основном обработка ошибок, которую вы поместили в свой обратный вызов .send, будет работать только ... если при вызове .send возникает ошибка. Если между отправкой сообщений, когда ваши сокеты простаивают, возникает некоторая ошибка системного уровня вне вашего контроля и приводит к поломке ваших сокетов, ваш сокет может затем выдать событие ошибки, которое не будет обработано (например, что происходит в вашей текущей реализации, с периодическими ошибками, которые вы видите до фатальной). В этот момент они могут быть навсегда непригодны для использования, что означает, что их следует заменить / восстановить или решить иным образом (или, в качестве альтернативы, просто принудительно перезапустите приложение и завершите его, как я :-)).
Цени его приятель.
Здесь полный снимок в темноте, но это вопрос условий гонки? См. Этот комментарий github, чтобы узнать, о чем я думаю: github.com/nodejs/help/issues/2484#issuecomment-590944091 Я недостаточно знаю об инициализации сокета, но звучит так, как будто попытка отправить в сокет до того, как вы убедились, что сокет слушает, по крайней мере, вызывает вашу реальную проблему скрытый.