package-rebuilds/pkgs/node-undici/node-undici-5.28.4+dfsg1+~cs23.12.11/test/balanced-pool.js
Fabian Grünbichler afee63929c add node-undici for libgit2 1.8.1
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
2024-09-10 11:23:24 +02:00

566 lines
16 KiB
JavaScript

'use strict'
const { test } = require('tap')
const { BalancedPool, Pool, Client, errors } = require('..')
const { createServer } = require('http')
const { promisify } = require('util')
test('throws when factory is not a function', (t) => {
t.plan(2)
try {
new BalancedPool(null, { factory: '' }) // eslint-disable-line
} catch (err) {
t.type(err, errors.InvalidArgumentError)
t.equal(err.message, 'factory must be a function.')
}
})
test('add/remove upstreams', (t) => {
t.plan(7)
const upstream01 = 'http://localhost:1'
const upstream02 = 'http://localhost:2'
const pool = new BalancedPool()
t.same(pool.upstreams, [])
// try to remove non-existent upstream
pool.removeUpstream(upstream01)
t.same(pool.upstreams, [])
pool.addUpstream(upstream01)
t.same(pool.upstreams, [upstream01])
// try to add the same upstream
pool.addUpstream(upstream01)
t.same(pool.upstreams, [upstream01])
pool.addUpstream(upstream02)
t.same(pool.upstreams, [upstream01, upstream02])
pool.removeUpstream(upstream02)
t.same(pool.upstreams, [upstream01])
pool.removeUpstream(upstream01)
t.same(pool.upstreams, [])
})
test('basic get', async (t) => {
t.plan(16)
let server1Called = 0
const server1 = createServer((req, res) => {
server1Called++
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server1.close.bind(server1))
await promisify(server1.listen).call(server1, 0)
let server2Called = 0
const server2 = createServer((req, res) => {
server2Called++
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server2.close.bind(server2))
await promisify(server2.listen).call(server2, 0)
const client = new BalancedPool()
client.addUpstream(`http://localhost:${server1.address().port}`)
client.addUpstream(`http://localhost:${server2.address().port}`)
t.teardown(client.destroy.bind(client))
{
const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
t.equal('hello', await body.text())
}
{
const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
t.equal('hello', await body.text())
}
t.equal(server1Called, 1)
t.equal(server2Called, 1)
t.equal(client.destroyed, false)
t.equal(client.closed, false)
await client.close()
t.equal(client.destroyed, true)
t.equal(client.closed, true)
})
test('connect/disconnect event(s)', (t) => {
const clients = 2
t.plan(clients * 5)
const server = createServer((req, res) => {
res.writeHead(200, {
Connection: 'keep-alive',
'Keep-Alive': 'timeout=1s'
})
res.end('ok')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const pool = new BalancedPool(`http://localhost:${server.address().port}`, {
connections: clients,
keepAliveTimeoutThreshold: 100
})
t.teardown(pool.close.bind(pool))
pool.on('connect', (origin, [pool, pool2, client]) => {
t.equal(client instanceof Client, true)
})
pool.on('disconnect', (origin, [pool, pool2, client], error) => {
t.ok(client instanceof Client)
t.type(error, errors.InformationalError)
t.equal(error.code, 'UND_ERR_INFO')
})
for (let i = 0; i < clients; i++) {
pool.request({
path: '/',
method: 'GET'
}, (err, { headers, body }) => {
t.error(err)
body.resume()
})
}
})
})
test('busy', (t) => {
t.plan(8 * 6 + 2 + 1)
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new BalancedPool(`http://localhost:${server.address().port}`, {
connections: 2,
pipelining: 2
})
client.on('drain', () => {
t.pass()
})
client.on('connect', () => {
t.pass()
})
t.teardown(client.destroy.bind(client))
for (let n = 1; n <= 8; ++n) {
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
}
})
})
test('factory option with basic get request', async (t) => {
t.plan(12)
let factoryCalled = 0
const opts = {
factory: (origin, opts) => {
factoryCalled++
return new Pool(origin, opts)
}
}
const client = new BalancedPool([], opts) // eslint-disable-line
let serverCalled = 0
const server = createServer((req, res) => {
serverCalled++
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
await promisify(server.listen).call(server, 0)
client.addUpstream(`http://localhost:${server.address().port}`)
t.same(client.upstreams, [`http://localhost:${server.address().port}`])
t.teardown(client.destroy.bind(client))
{
const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
t.equal('hello', await body.text())
}
t.equal(serverCalled, 1)
t.equal(factoryCalled, 1)
t.equal(client.destroyed, false)
t.equal(client.closed, false)
await client.close()
t.equal(client.destroyed, true)
t.equal(client.closed, true)
})
test('throws when upstream is missing', async (t) => {
t.plan(2)
const pool = new BalancedPool()
try {
await pool.request({ path: '/', method: 'GET' })
} catch (e) {
t.type(e, errors.BalancedPoolMissingUpstreamError)
t.equal(e.message, 'No upstream has been added to the BalancedPool')
}
})
class TestServer {
constructor ({ config: { server, socketHangup, downOnRequests, socketHangupOnRequests }, onRequest }) {
this.config = {
downOnRequests: downOnRequests || [],
socketHangupOnRequests: socketHangupOnRequests || [],
socketHangup
}
this.name = server
// start a server listening to any port available on the host
this.port = 0
this.iteration = 0
this.requestsCount = 0
this.onRequest = onRequest
this.server = null
}
_shouldHangupOnClient () {
if (this.config.socketHangup) {
return true
}
if (this.config.socketHangupOnRequests.includes(this.requestsCount)) {
return true
}
return false
}
_shouldStopServer () {
if (this.config.upstreamDown === true || this.config.downOnRequests.includes(this.requestsCount)) {
return true
}
return false
}
async prepareForIteration (iteration) {
// set current iteration
this.iteration = iteration
if (this._shouldStopServer()) {
await this.stop()
} else if (!this.isRunning()) {
await this.start()
}
}
start () {
this.server = createServer((req, res) => {
if (this._shouldHangupOnClient()) {
req.destroy(new Error('(ツ)'))
return
}
this.requestsCount++
res.end('server is running!')
this.onRequest(this)
}).listen(this.port)
this.server.keepAliveTimeout = 2000
return new Promise((resolve) => {
this.server.on('listening', () => {
// store the used port to use it again if the server was stopped as part of test and then started again
this.port = this.server.address().port
return resolve()
})
})
}
isRunning () {
return !!this.server.address()
}
stop () {
if (!this.isRunning()) {
return
}
return new Promise(resolve => {
this.server.close(() => resolve())
})
}
}
const cases = [
// 0
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 7,
config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }],
expected: ['A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 0,
expectedSocketErrors: 0,
expectedRatios: [0.34, 0.33, 0.33]
},
// 1
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', downOnRequests: [0] }, { server: 'B' }, { server: 'C' }],
expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A', 'B', 'C', 'A'],
expectedConnectionRefusedErrors: 1,
expectedSocketErrors: 0,
expectedRatios: [0.32, 0.34, 0.34]
},
// 2
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C' }],
expected: ['A', 'B/connectionRefused', 'C', 'A', 'C', 'A', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 1,
expectedSocketErrors: 0,
expectedRatios: [0.34, 0.32, 0.34]
},
// 3
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A' }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }],
expected: ['A', 'B/connectionRefused', 'C/connectionRefused', 'A', 'A', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 2,
expectedSocketErrors: 0,
expectedRatios: [0.35, 0.33, 0.32]
},
// 4
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0] }, { server: 'C', downOnRequests: [0] }],
expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 3,
expectedSocketErrors: 0,
expectedRatios: [0.34, 0.33, 0.33]
},
// 5
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', downOnRequests: [0, 1, 2] }, { server: 'B', downOnRequests: [0, 1, 2] }, { server: 'C', downOnRequests: [0, 1, 2] }],
expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 9,
expectedSocketErrors: 0,
expectedRatios: [0.34, 0.33, 0.33]
},
// 6
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', downOnRequests: [0] }, { server: 'B', downOnRequests: [0, 1] }, { server: 'C', downOnRequests: [0] }],
expected: ['A/connectionRefused', 'B/connectionRefused', 'C/connectionRefused', 'A', 'B/connectionRefused', 'C', 'A', 'B', 'C', 'A', 'B', 'C', 'A', 'C', 'A', 'C', 'A', 'C', 'A', 'B'],
expectedConnectionRefusedErrors: 4,
expectedSocketErrors: 0,
expectedRatios: [0.36, 0.29, 0.35]
},
// 7
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A' }, { server: 'B' }, { server: 'C', downOnRequests: [1] }],
expected: ['A', 'B', 'C', 'A', 'B', 'C/connectionRefused', 'A', 'B', 'A', 'B', 'A', 'B', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 1,
expectedSocketErrors: 0,
expectedRatios: [0.34, 0.34, 0.32],
// Skip because the behavior of Node.js has changed
skip: true
},
// 8
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', socketHangupOnRequests: [1] }, { server: 'B' }, { server: 'C' }],
expected: ['A', 'B', 'C', 'A/socketError', 'B', 'C', 'B', 'C', 'B', 'C', 'A'],
expectedConnectionRefusedErrors: 0,
expectedSocketErrors: 1,
expectedRatios: [0.32, 0.34, 0.34]
},
// 9
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 7,
config: [{ server: 'A' }, { server: 'B' }, { server: 'C' }, { server: 'D' }, { server: 'E' }],
expected: ['A', 'B', 'C', 'D', 'E', 'A', 'B', 'C', 'D', 'E'],
expectedConnectionRefusedErrors: 0,
expectedSocketErrors: 0,
expectedRatios: [0.2, 0.2, 0.2, 0.2, 0.2]
},
// 10
{
iterations: 100,
maxWeightPerServer: 100,
errorPenalty: 15,
config: [{ server: 'A', downOnRequests: [0, 1, 2, 3] }, { server: 'B' }, { server: 'C' }],
expected: ['A/connectionRefused', 'B', 'C', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'B', 'C', 'A/connectionRefused', 'B', 'C', 'A', 'B', 'C', 'A', 'B', 'C'],
expectedConnectionRefusedErrors: 4,
expectedSocketErrors: 0,
expectedRatios: [0.18, 0.41, 0.41]
}
]
for (const [index, { config, expected, expectedRatios, iterations = 9, expectedConnectionRefusedErrors = 0, expectedSocketErrors = 0, maxWeightPerServer, errorPenalty = 10, only = false, skip = false }] of cases.entries()) {
test(`weighted round robin - case ${index}`, { only, skip }, async (t) => {
// cerate an array to store succesfull reqeusts
const requestLog = []
// create instances of the test servers according to the config
const servers = config.map((serverConfig) => new TestServer({
config: serverConfig,
onRequest: (server) => {
requestLog.push(server.name)
}
}))
t.teardown(() => servers.map(server => server.stop()))
// start all servers to get a port so that we can build the upstream urls to supply them to undici
await Promise.all(servers.map(server => server.start()))
// build upstream urls
const urls = servers.map(server => `http://localhost:${server.port}`)
// add upstreams
const client = new BalancedPool(urls[0], { maxWeightPerServer, errorPenalty })
urls.slice(1).map(url => client.addUpstream(url))
let connectionRefusedErrors = 0
let socketErrors = 0
for (let i = 0; i < iterations; i++) {
// setup test servers for the next iteration
await Promise.all(servers.map(server => server.prepareForIteration(i)))
// send a request using undinci
try {
await client.request({ path: '/', method: 'GET' })
} catch (e) {
const serverWithError =
servers.find(server => server.port === e.port) ||
servers.find(server => {
if (typeof AggregateError === 'function' && e instanceof AggregateError) {
return e.errors.some(e => server.port === (e.socket?.remotePort ?? e.port))
}
return server.port === e.socket.remotePort
})
serverWithError.requestsCount++
if (e.code === 'ECONNREFUSED') {
requestLog.push(`${serverWithError.name}/connectionRefused`)
connectionRefusedErrors++
}
if (e.code === 'UND_ERR_SOCKET') {
requestLog.push(`${serverWithError.name}/socketError`)
socketErrors++
}
}
}
const totalRequests = servers.reduce((acc, server) => {
return acc + server.requestsCount
}, 0)
t.equal(totalRequests, iterations)
t.equal(connectionRefusedErrors, expectedConnectionRefusedErrors)
t.equal(socketErrors, expectedSocketErrors)
if (expectedRatios) {
const ratios = servers.reduce((acc, el) => {
acc[el.name] = 0
return acc
}, {})
requestLog.map(el => ratios[el[0]]++)
t.match(Object.keys(ratios).map(k => ratios[k] / iterations), expectedRatios)
}
if (expected) {
t.match(requestLog.slice(0, expected.length), expected)
}
await client.close()
})
}