node/lib/internal/cluster/round_robin_handle.js
Yash Ladha 6faa162f55
lib: changed functional logic in cluster schedulers
Free pool in round_robin scheduler is implemented as an array. There
were constant lookups being for distributing load on other workers in
free pool. Reimplementing in Map will create will be more performant as
compared to Array implementation. This was done for all in past but free
wasn't implemented at that time.

PR-URL: https://github.com/nodejs/node/pull/32505
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Michaël Zasso <targos@protonmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
2020-04-07 20:36:20 +02:00

126 lines
2.9 KiB
JavaScript

'use strict';
const {
ArrayIsArray,
Boolean,
Map,
} = primordials;
const assert = require('internal/assert');
const net = require('net');
const { sendHelper } = require('internal/cluster/utils');
const { constants } = internalBinding('tcp_wrap');
module.exports = RoundRobinHandle;
function RoundRobinHandle(key, address, port, addressType, fd, flags) {
this.key = key;
this.all = new Map();
this.free = new Map();
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
});
} else
this.server.listen(address); // UNIX socket path.
this.server.once('listening', () => {
this.handle = this.server._handle;
this.handle.onconnection = (err, handle) => this.distribute(err, handle);
this.server._handle = null;
this.server = null;
});
}
RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
// TODO(bnoordhuis) Check err.
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null)
return done();
// Still busy binding.
this.server.once('listening', done);
this.server.once('error', (err) => {
send(err.errno, null);
});
};
RoundRobinHandle.prototype.remove = function(worker) {
const existed = this.all.delete(worker.id);
if (!existed)
return false;
this.free.delete(worker.id);
if (this.all.size !== 0)
return false;
for (const handle of this.handles) {
handle.close();
}
this.handles = [];
this.handle.close();
this.handle = null;
return true;
};
RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
const [ workerEntry ] = this.free;
if (ArrayIsArray(workerEntry)) {
const [ workerId, worker ] = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
}
};
RoundRobinHandle.prototype.handoff = function(worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}
const handle = this.handles.shift();
if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}
const message = { act: 'newconn', key: this.key };
sendHelper(worker.process, message, handle, (reply) => {
if (reply.accepted)
handle.close();
else
this.distribute(0, handle); // Worker is shutting down. Send to another.
this.handoff(worker);
});
};