node/lib/internal/cluster/round_robin_handle.js
cjihrig 2f885e7a43 cluster: refactor module into multiple files
This commit splits the existing cluster module into several
internal modules. More specifically, the cluster master and
worker implementations are separated, and the various data
structures are separated.

PR-URL: https://github.com/nodejs/node/pull/10746
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Sam Roberts <vieuxtech@gmail.com>
2017-01-13 17:01:07 -05:00

116 lines
2.9 KiB
JavaScript

'use strict';
const assert = require('assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const getOwnPropertyNames = Object.getOwnPropertyNames;
const uv = process.binding('uv');
module.exports = RoundRobinHandle;
function RoundRobinHandle(key, address, port, addressType, fd) {
this.key = key;
this.all = {};
this.free = [];
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0)
this.server.listen(port, address);
else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {
assert(worker.id in this.all === false);
this.all[worker.id] = worker;
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
// Hack: translate 'EADDRINUSE' error string back to numeric error code.
// It works but ideally we'd have some backchannel between the net and
// cluster modules for stuff like this.
const errno = uv['UV_' + err.errno];
send(errno, null);
});
};
RoundRobinHandle.prototype.remove = function(worker) {
if (worker.id in this.all === false)
return false;
delete this.all[worker.id];
const index = this.free.indexOf(worker);
if (index !== -1)
this.free.splice(index, 1);
if (getOwnPropertyNames(this.all).length !== 0)
return false;
for (var handle; handle = this.handles.shift(); handle.close())
;
this.handle.close();
this.handle = null;
return true;
};
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const worker = this.free.shift();
if (worker)
this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (worker.id in this.all === false) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.push(worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};