NodeJS|Cluster: How to send data from master to all or single child/workers?
Because cluster.fork is implemented on top of child_process.fork, you can send messages from a master to the worker by using worker.send({ msg: 'test' })
, and from a worker to a master by process.send({ msg: 'test' });
. You receive the messages like so: worker.on('message', callback)
(from worker to master) and process.on('message', callback);
(from master to worker).
Here's my full example, you can test it by browsing http://localhost:8000/ Then the worker will send a message to the master and the master will reply:
var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;
if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < 2; i++) {
worker = cluster.fork();
worker.on('message', function(msg) {
// we only want to intercept messages that have a chat property
if (msg.chat) {
console.log('Worker to master: ', msg.chat);
worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
}
});
}
} else {
process.on('message', function(msg) {
// we only want to intercept messages that have a chat property
if (msg.chat) {
console.log('Master to worker: ', msg.chat);
}
});
// Worker processes have a http server.
http.Server(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
// Send message to master process
process.send({ chat: 'Hey master, I got a new request!' });
}).listen(8000);
}
I found this thread while looking for a way to send a message to all child processes and was thankfully able to figure it out thanks to the comments about arrays. Just wanted to illustrate a potential solution for sending a message to all child processes utilizing this approach.
var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];
if (cluster.isMaster) {
// Broadcast a message to all workers
var broadcast = function() {
for (var i in workers) {
var worker = workers[i];
worker.send({ cmd: 'broadcast', numReqs: numReqs });
}
}
// Fork workers.
for (var i = 0; i < 2; i++) {
var worker = cluster.fork();
worker.on('message', function(msg) {
if (msg.cmd) {
switch (msg.cmd) {
case 'notifyRequest':
numReqs++;
break;
case 'broadcast':
broadcast();
break;
}
});
// Add the worker to an array of known workers
workers.push(worker);
}
setInterval(function() {
console.log("numReqs =", numReqs);
}, 1000);
} else {
// React to messages received from master
process.on('message', function(msg) {
switch(msg.cmd) {
case 'broadcast':
if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
break;
}
});
// Worker processes have a http server.
http.Server(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
// Send message to master process
process.send({ cmd: 'notifyRequest' });
process.send({ cmd: 'broadcast' });
}).listen(8000);
}
Here's how I implemented a solution to a similar problem. By hooking into cluster.on('fork')
, you can attach message handlers to workers as they are forked (rather than storing them in an array), which has the added advantage of dealing with cases where workers die or disconnect and a new worker is forked.
This snippet would send a message from the master to all workers.
if (cluster.isMaster) {
for (var i = 0; i < require('os').cpus.length; i++) {
cluster.fork();
}
cluster.on('disconnect', function(worker) {
cluster.fork();
}
// When a new worker process is forked, attach the handler
// This handles cases where new worker processes are forked
// on disconnect/exit, as above.
cluster.on('fork', function(worker) {
worker.on('message', messageRelay);
}
var messageRelay = function(msg) {
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].send(msg);
});
};
}
else {
process.on('message', messageHandler);
var messageHandler = function messageHandler(msg) {
// Worker received message--do something
};
}