起源, 从官方实例中看多进程共用端口
- const cluster = require('cluster');
- const http = require('http');
- const numCPUs = require('os').cpus().length;
- if (cluster.isMaster) {
- console.log(`Master ${process.pid} is running`);
- for (let i = 0; i <numCPUs; i++) {
- cluster.fork();
- }
- cluster.on('exit', (worker, code, signal) => {
- console.log(`worker ${worker.process.pid} died`);
- });
- } else {
- http.createServer((req, res) => {
- res.writeHead(200);
- res.end('hello world\n');
- }).listen(8000);
- console.log(`Worker ${process.pid} started`);
- }
执行结果:
- $ node server.js
- Master 3596 is running
- Worker 4324 started
- Worker 4520 started
- Worker 6056 started
- Worker 5644 started
了解 http.js 模块:
我们都只有要创建一个 http 服务, 必须引用 http 模块, http 模块最终会调用 net.js 实现网络服务
- // lib/net.js
- 'use strict';
- ...
- Server.prototype.listen = function(...args) {
- ...
- if (options instanceof TCP) {
- this._handle = options;
- this[async_id_symbol] = this._handle.getAsyncId();
- listenInCluster(this, null, -1, -1, backlogFromArgs); // 注意这个方法调用了 cluster 模式下的处理办法
- return this;
- }
- ...
- };
- function listenInCluster(server, address, port, addressType,backlog, fd, exclusive) {
- // 如果是 master 进程或者没有开启 cluster 模式直接启动 listen
- if (cluster.isMaster || exclusive) {
- //_listen2, 细心的人一定会发现为什么是 listen2 而不直接使用 listen
- // _listen2 包裹了 listen 方法, 如果是 Worker 进程, 会调用被 hack 后的 listen 方法, 从而避免出错端口被占用的错误
- server._listen2(address, port, addressType, backlog, fd);
- return;
- }
- const serverQuery = {
- address: address,
- port: port,
- addressType: addressType,
- fd: fd,
- flags: 0
- };
- // 是 fork 出来的进程, 获取 master 上的 handel, 并且监听,
- // 现在是不是很好奇_getServer 方法做了什么
- cluster._getServer(server, serverQuery, listenOnMasterHandle);
- }
- ...
答案很快就可以通过 cluster._getServer 这个函数找到
代理了 server._listen2 这个方法在 work 进程的执行操作
向 master 发送 queryServer 消息, 向 master 注册一个内部 TCP 服务器
- // lib/internal/cluster/child.js
- cluster._getServer = function(obj, options, cb) {
- // ...
- const message = util._extend({
- act: 'queryServer', // 关键点: 构建一个 queryServer 的消息
- index: indexes[indexesKey],
- data: null
- }, options);
- message.address = address;
- // 发送 queryServer 消息给 master 进程, master 在收到这个消息后, 会创建一个开始一个 server, 并且 listen
- send(message, (reply, handle) => {
- rr(reply, indexesKey, cb); // Round-robin.
- });
- obj.once('listening', () => {
- cluster.worker.state = 'listening';
- const address = obj.address();
- message.act = 'listening';
- message.port = address && address.port || options.port;
- send(message);
- });
- };
- //...
- // Round-robin. Master distributes handles across workers.
- function rr(message, indexesKey, cb) {
- if (message.errno) return cb(message.errno, null);
- var key = message.key;
- // 这里 hack 了 listen 方法
- // 子进程调用的 listen 方法, 就是这个, 直接返回 0, 所以不会报端口被占用的错误
- function listen(backlog) {
- return 0;
- }
- // ...
- const handle = { close, listen, ref: noop, unref: noop };
- handles[key] = handle;
- // 这个 cb 函数是 net.js 中的 listenOnMasterHandle 方法
- cb(0, handle);
- }
- // lib/net.js
- /*
- function listenOnMasterHandle(err, handle) {
- err = checkBindError(err, port, handle);
- server._handle = handle;
- // _listen2 函数中, 调用的 handle.listen 方法, 也就是上面被 hack 的 listen
- server._listen2(address, port, addressType, backlog, fd);
- }
- */
master 进程收到 queryServer 消息后进行启动服务
如果地址没被监听过, 通过 RoundRobinHandle 监听开启服务
如果地址已经被监听, 直接绑定 handel 到已经监听到服务上, 去消费请求
- // lib/internal/cluster/master.js
- function queryServer(worker, message) {
- const args = [
- message.address,
- message.port,
- message.addressType,
- message.fd,
- message.index
- ];
- const key = args.join(':');
- var handle = handles[key];
- // 如果地址没被监听过, 通过 RoundRobinHandle 监听开启服务
- if (handle === undefined) {
- var constructor = RoundRobinHandle;
- if (schedulingPolicy !== SCHED_RR ||
- message.addressType === 'udp4' ||
- message.addressType === 'udp6') {
- constructor = SharedHandle;
- }
- handles[key] = handle = new constructor(key,
- address,
- message.port,
- message.addressType,
- message.fd,
- message.flags);
- }
- // 如果地址已经被监听, 直接绑定 handel 到已经监听到服务上, 去消费请求
- // Set custom server data
- handle.add(worker, (errno, reply, handle) => {
- reply = util._extend({
- errno: errno,
- key: key,
- ack: message.seq,
- data: handles[key].data
- }, reply);
- if (errno)
- delete handles[key]; // Gives other workers a chance to retry.
- send(worker, reply, handle);
- });
- }
看到这一步, 已经很明显, 我们知道了多进行端口共享的实现原理
其实端口仅由 master 进程中的内部 TCP 服务器监听了一次
因为 net.js 模块中会判断当前的进程是 master 还是 Worker 进程
如果是 Worker 进程调用 cluster._getServer 去 hack 原生的 listen 方法
所以在 child 调用的 listen 方法, 是一个 return 0 的空方法, 所以不会报端口占用错误
那现在问题来了, 既然 Worker 进程是如何获取到 master 进程监听服务接收到的 connect 呢?
监听 master 进程启动的 TCP 服务器的 connection 事件
通过轮询挑选出一个 worker
向其发送 newconn 内部消息, 消息体中包含了客户端句柄
有了句柄, 谁都知道要怎么处理了哈哈
- // lib/internal/cluster/round_robin_handle.js
- function RoundRobinHandle(key, address, port, addressType, fd) {
- this.server = net.createServer(assert.fail);
- if (fd>= 0)
- this.server.listen({ fd });
- else if (port>= 0)
- this.server.listen(port, address);
- else
- this.server.listen(address); // UNIX socket path.
- this.server.once('listening', () => {
- this.handle = this.server._handle;
- // 监听 onconnection 方法
- this.handle.onconnection = (err, handle) => this.distribute(err, handle);
- this.server._handle = null;
- this.server = null;
- });
- }
- RoundRobinHandle.prototype.add = function (worker, send) {
- // ...
- };
- RoundRobinHandle.prototype.remove = function (worker) {
- // ...
- };
- RoundRobinHandle.prototype.distribute = function (err, handle) {
- // 负载均衡地挑选出一个 worker
- this.handles.push(handle);
- const worker = this.free.shift();
- if (worker) this.handoff(worker);
- };
- RoundRobinHandle.prototype.handoff = function (worker) {
- const handle = this.handles.shift();
- const message = { act: 'newconn', key: this.key };
- // 向 work 进程其发送 newconn 内部消息和客户端的句柄 handle
- sendHelper(worker.process, message, handle, (reply) => {
- // ...
- this.handoff(worker);
- });
- };
下面让我们看看 Worker 进程接收到 newconn 消息后进行了哪些操作
- // lib/child.js
- function onmessage(message, handle) {
- if (message.act === 'newconn')
- onconnection(message, handle);
- else if (message.act === 'disconnect')
- _disconnect.call(worker, true);
- }
- // Round-robin connection.
- // 接收连接, 并且处理
- function onconnection(message, handle) {
- const key = message.key;
- const server = handles[key];
- const accepted = server !== undefined;
- send({ ack: message.seq, accepted });
- if (accepted) server.onconnection(0, handle);
- }
总结
net 模块会对进程进行判断, 是 worker 还是 master, 是 worker 的话进行 hack net.Server 实例的 listen 方法
worker 调用的 listen 方法是 hack 掉的, 直接 return 0, 不过会向 master 注册一个 connection 接手的事件
master 收到客户端 connection 事件后, 会轮询向 worker 发送 connection 上来的客户端句柄
worker 收到 master 发送过来客户端的句柄, 这时候就可以处理客户端请求了
分享出于共享学习的目的, 如有错误, 欢迎大家留言指导, 不喜勿喷.
来源: http://www.jianshu.com/p/0843ae261680