NodeJS|Cluster:如何将数据从 master 发送到所有或单个 child/workers?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/8534462/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-02 14:54:09  来源:igfitidea点击:

NodeJS|Cluster: How to send data from master to all or single child/workers?

node.jscluster-computing

提问by htonus

I have working (stock) script from node

我有来自节点的工作(库存)脚本

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd && msg.cmd == 'notifyRequest') {
        numReqs++;
      }
    });
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

In the above script I can send data from worker to master process with ease. But how to send data from master to the worker/workers? With examples, if it possible.

在上面的脚本中,我可以轻松地将数据从工作进程发送到主进程。但是如何将数据从主人发送到工人/工人?如果可能的话,举个例子。

回答by alessioalex

Because cluster.fork is implemented on top of child_process.fork, you can send messages from a master to the worker by using worker.send({ msg: 'test' }), and from a worker to a master by process.send({ msg: 'test' });. You receive the messages like so: worker.on('message', callback)(from worker to master) and process.on('message', callback);(from master to worker).

由于 cluster.fork 是在child_process.fork之上实现的,因此您可以使用 将消息从 master 发送到 worker worker.send({ msg: 'test' }),从 worker 发送到 master process.send({ msg: 'test' });。你会收到这样的消息:(worker.on('message', callback)从工人到主人)和process.on('message', callback);(从主人到工人)。

Here's my full example, you can test it by browsing http://localhost:8000/Then the worker will send a message to the master and the master will reply:

这是我的完整示例,您可以通过浏览http://localhost:8000/ 进行测试,然后 worker 会向 master 发送消息,master 会回复:

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var worker;

if (cluster.isMaster) {
  // Fork workers.
  for (var i = 0; i < 2; i++) {
    worker = cluster.fork();

    worker.on('message', function(msg) {
      // we only want to intercept messages that have a chat property
      if (msg.chat) {
        console.log('Worker to master: ', msg.chat);
        worker.send({ chat: 'Ok worker, Master got the message! Over and out!' });
      }
    });

  }
} else {
  process.on('message', function(msg) {
    // we only want to intercept messages that have a chat property
    if (msg.chat) {
      console.log('Master to worker: ', msg.chat);
    }
  });
  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ chat: 'Hey master, I got a new request!' });
  }).listen(8000);
}

回答by Kevin Reilly

I found this thread while looking for a way to send a message to all child processes and was thankfully able to figure it out thanks to the comments about arrays. Just wanted to illustrate a potential solution for sending a message to all child processes utilizing this approach.

我在寻找一种向所有子进程发送消息的方法时发现了这个线程,幸好由于有关数组的评论,我能够弄清楚。只是想说明使用这种方法向所有子进程发送消息的潜在解决方案。

var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
var workers = [];

if (cluster.isMaster) {
  // Broadcast a message to all workers
  var broadcast = function() {
    for (var i in workers) {
      var worker = workers[i];
      worker.send({ cmd: 'broadcast', numReqs: numReqs });
    }
  }

  // Fork workers.
  for (var i = 0; i < 2; i++) {
    var worker = cluster.fork();

    worker.on('message', function(msg) {
      if (msg.cmd) {
        switch (msg.cmd) {
          case 'notifyRequest':
            numReqs++;
          break;
          case 'broadcast':
            broadcast();
          break;
        }
    });

    // Add the worker to an array of known workers
    workers.push(worker);
  }

  setInterval(function() {
    console.log("numReqs =", numReqs);
  }, 1000);
} else {
  // React to messages received from master
  process.on('message', function(msg) {
    switch(msg.cmd) {
      case 'broadcast':
        if (msg.numReqs) console.log('Number of requests: ' + msg.numReqs);
      break;
    }
  });

  // Worker processes have a http server.
  http.Server(function(req, res) {
    res.writeHead(200);
    res.end("hello world\n");
    // Send message to master process
    process.send({ cmd: 'notifyRequest' });
    process.send({ cmd: 'broadcast' });
  }).listen(8000);
}

回答by LiquidPony

Here's how I implemented a solution to a similar problem. By hooking into cluster.on('fork'), you can attach message handlers to workers as they are forked (rather than storing them in an array), which has the added advantage of dealing with cases where workers die or disconnect and a new worker is forked.

这是我如何实施类似问题的解决方案。通过连接到cluster.on('fork'),您可以在分叉工人时将消息处理程序附加到他们(而不是将它们存储在数组中),这具有处理工人死亡或断开连接以及新工人分叉的情况的额外优势。

This snippet would send a message from the master to allworkers.

此代码段将从 master 向所有worker发送消息。

if (cluster.isMaster) {
    for (var i = 0; i < require('os').cpus.length; i++) {
        cluster.fork();
    }

    cluster.on('disconnect', function(worker) {
        cluster.fork();
    }

    // When a new worker process is forked, attach the handler
    // This handles cases where new worker processes are forked
    // on disconnect/exit, as above.
    cluster.on('fork', function(worker) {
        worker.on('message', messageRelay);
    }

    var messageRelay = function(msg) {
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send(msg);
        });
    };
}
else {
    process.on('message', messageHandler);

    var messageHandler = function messageHandler(msg) {
        // Worker received message--do something
    };
}

回答by Raman

I understand your purpose of broadcasting to all the node worker processes in a cluster, although you can not send socket component as such but there is a work around for the purpose to be served. I will try an explain with an example :

我了解您向集群中的所有节点工作进程广播的目的,尽管您不能发送套接字组件,但有一种解决方法可以达到目的。我将尝试用一个例子来解释:

Step 1 : When a client action requires a broadcast :

第 1 步:当客户端操作需要广播时:

Child.js (Process that has been forked) :

socket.on("BROADCAST_TO_ALL_WORKERS", function (data) 
{
    process.send({cmd : 'BROADCAST_TO_ALL_WORKERS', message :data.message});
}) 

Step 2 : On the cluster creation side

第 2 步:在集群创建端

Server.js (Place where cluster forking happens):

if (cluster.isMaster) {

  for (var i = 0; i < numCPUs; i++) {

    var worker = cluster.fork();

    worker.on('message', function (data) {
     if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
       console.log(server_debug_prefix() + "Server Broadcast To All, Message : " + data.message + " , Reload : " + data.reload + " Player Id : " + data.player_id);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  }

  cluster.on('exit', function (worker, code, signal) {
    var newWorker = cluster.fork();
    newWorker.on('message', function (data) {
      console.log(data);
      if (data.cmd === "BROADCAST_TO_ALL_WORKERS") {
        console.log(data.cmd,data);
        Object.keys(cluster.workers).forEach(function(id) {
            cluster.workers[id].send({cmd : "BROADCAST_TO_WORKER", message : data.message});
        });
      }
    });
  });
} 
else {
  //Node Js App Entry
  require("./Child.js");
}

Step 3: To Broadcast in the child process -

第 3 步:在子进程中进行广播 -

-> Put this before io.on("connection") in Child.js

-> 将其放在 Child.js 中的 io.on("connection") 之前

process.on("message", function(data){
    if(data.cmd === "BROADCAST_TO_WORKER"){
        io.sockets.emit("SERVER_MESSAGE", { message: data.message, reload: data.reload, player_id : data.player_id });
    }
});

I hope this helps. Please let me know if more clarification is required.

我希望这有帮助。如果需要更多说明,请告诉我。

回答by cheng81

You should be able to send a message from the master to the worker like this:

您应该能够像这样从主人向工作人员发送消息:

worker.send({message:'hello'})

because "cluster.fork is implemented on top of child_process.fork" (cluster.fork is implemented on top of child_process.fork)

因为“cluster.fork 是在 child_process.fork 之上实现的”(cluster.fork 是在 child_process.fork 之上实现的)