Это была задача из книги под названием "Node.js 8 правильный путь". Вы можете увидеть это ниже:
Это мое решение:
'use strict';
const zmq = require('zeromq');
const cluster = require('cluster');
const push = zmq.socket('push');
const pull = zmq.socket('pull');
const cores_num = require('os').cpus().length;
let workers_num = 0;
push.bind('tcp://127.0.0.1:9998');
pull.bind('tcp://127.0.0.1:9999');
// push.bind('ipc://push.ipc');
// pull.bind('ipc://pull.ipc');
if (cluster.isMaster) {
for (let j = 0; j < cores_num; j++) {
cluster.fork();
}
cluster.on('online', (worker) => {
console.info(`Worker with pid ${worker.process.pid} is created!`);
});
pull.on('message', (data) => {
const response = JSON.parse(data.toString());
if (response.type === 'ready') {
if (workers_num >= 0 && workers_num < 3) {
workers_num++;
if (workers_num == 3) {
console.info('Ready!');
for (let i = 0; i < 10; i++) {
push.send(JSON.stringify({
type: 'job',
data: `This message has id ${i}`
}));
}
}
}
} else if (response.type === 'result') {
console.info(data.toString());
}
});
} else {
const worker_push = zmq.socket('push');
const worker_pull = zmq.socket('pull');
worker_pull.connect('tcp://127.0.0.1:9998');
worker_push.connect('tcp://127.0.0.1:9999');
// worker_pull.connect('ipc://push.ipc');
// worker_push.connect('ipc://pull.ipc');
worker_push.send(JSON.stringify({
type: 'ready'
}));
worker_pull.on('message', data => {
const request = JSON.parse(data);
if (request.type === 'job') {
console.info(`Process ${process.pid} got message ${request.data}`);
worker_push.send(JSON.stringify({
type: 'result',
data: `This message is a response from process ${process.pid}`,
time: Date.now()
}));
}
});
}
Как видите, это работает только тогда, когда сокеты PUSH/PULL и рабочие взаимодействуют через TCP, но я хочу знать, почему это не работает через IPC.
Обновлять (ссылка: условие 4 ниже "путь должен быть доступным для записи"):
Я надеюсь, что вы поможете мне с поиском проблемы.





I want to know the reason why it doesn't work via IPC.
Есть несколько условий для использования транспортного класса ipc:// для использования масштабируемых формальных коммуникационных архетипов ZeroMQ и получения .bind()/.connect()-ed.
1) Межпроцессный транспорт передает сообщения между локальными процессами, используя зависящий от системы механизм IPC. Межпроцессный транспорт в настоящее время реализовано только в операционных системах, предоставляющих сокеты домена UNIX.
2) И сторона .bind(), и сторона .connect() должны встретиться при некотором допустимом адрес:
push.bind( 'ipc://push.sock' ); // will never meet its counterparty
// ------------------(--|||://^v^v^v^v^v^v^v )
worker_pull.connect( 'ipc:///tmp/push.sock'); // if used other ipc://-address
3) Если какой-либо второй процесс привязывается к конечной точке, уже связанной процессом, это будет успешным, и первый процесс потеряет свою привязку. В этом поведении транспортный класс ipc:// не соответствует транспортным классам tcp:// или inproc://.
4) Путь к адресу конечной точки должен должен быть доступен для записи процессом. Когда конечная точка начинается с /, например, ipc:///pathname, это будет абсолютный путь. Если конечная точка указывает несуществующий каталог, привязка будет неудачной.
5) Если путь к адресу конечной точки начинается с @, должно использоваться абстрактное пространство имен. Абстрактное пространство имен не зависит от файловой системы, и если процесс попытается связать конечную точку, уже связанную процессом, это не удастся. Подробности смотрите в unix(7).
6) ipc:// Пути транспортного класса имеют максимальный размер, который зависит от операционной системы. В Linux максимум 113 символов, включая префикс «ipc://» (107 символов для реального пути).
7) Когда в zmq_bind() использовалась спецификация конечной точки с подстановочным знаком *, вызывающая сторона должен использует реальную конечную точку, полученную из параметра сокета ZMQ_LAST_ENDPOINT, чтобы отвязать эту конечную точку от сокета с помощью zmq_unbind().
Если все пункты 1-4 были проверены и выполнены положительно, то есть причина подать заявку, относящуюся к транспортному классу ipc://, в очередь устранения неполадок сопровождающих.
У вас есть ipc://push.ipc (2 слеша) вам очень нужно ipc:///push.ipc
Протокол ipc://, тогда вам нужен путь к файлу /push.ipc
У вашего процесса есть разрешение на запись в корневой каталог? Если вы не работаете как root, я бы так не подумал.
Я бы изменил путь на что-то вроде /tmp/push.ipc, который в большинстве систем доступен для записи всем пользователям.
В этом случае ваш URL должен быть:
ipc:///tmp/push.ipc
Я вообще не использую узел, но, основываясь на своих знаниях о разветвлениях других языков, я думаю, что вся программа снова запускается в другом процессе/потоке.
В этом случае не каждый работник пытается bind() снова, так как код создания/привязки сокета находится за пределами if (cluster.isMaster) {
Это должно выглядеть так, я думаю
if (cluster.isMaster) {
const push = zmq.socket('push');
const pull = zmq.socket('pull');
push.bind('ipc://push.ipc');
pull.bind('ipc://pull.ipc');
....
}
Вы видите файлы, созданные на диске (например, /tmp/push.ipc)? Если они есть, сделайте ls -l, чтобы мы могли увидеть разрешения
Вот оно, ребята: pp.userapi.com/c849332/v849332599/1dd387/JWw_dDhdfqU.jpg
файлы ipc выглядят нормально, но у меня есть вопрос о вашем разветвлении и сокетах. Я обновил свой ответ, так как ему нужно больше места для объяснения
Вы были абсолютно правы, спасибо! Проблема заключалась в том, что я привязывал сокеты вне родительского процесса. Таким образом, каждый воркер запускал эти строчки кода и привязывал push/pull сокеты, что было очень грубой ошибкой.
В любом случае, интересно, как, черт возьми, это работало через TCP-сокеты. .-.
В случае TCP, если привязка мастера происходит раньше рабочих, рабочие просто не смогут выполнить привязку, но продолжат работу. IPC немного сложнее, и несколько связываний иногда дают неопределенные результаты. Если вы довольны ответом, можете ли вы проголосовать за него.
Да, ты прав. У меня есть ошибка с путями сокетов, но это ничего не изменило. :) Кстати, моя машина работает на дистрибутиве Ubuntu, так что все должно быть в порядке. У вас есть идеи, почему это все еще не работает?