package-rebuilds/pkgs/node-undici/node-undici-5.28.4+dfsg1+~cs23.12.11/test/pool.js
Fabian Grünbichler afee63929c add node-undici for libgit2 1.8.1
Signed-off-by: Fabian Grünbichler <f.gruenbichler@proxmox.com>
2024-09-10 11:23:24 +02:00

1102 lines
24 KiB
JavaScript

'use strict'
const { EventEmitter } = require('events')
const { createServer } = require('http')
const net = require('net')
const {
finished,
PassThrough,
Readable
} = require('stream')
const { promisify } = require('util')
const proxyquire = require('proxyquire')
const { test } = require('tap')
const {
kBusy,
kPending,
kRunning,
kSize,
kUrl
} = require('../lib/core/symbols')
const {
Client,
Pool,
errors
} = require('..')
test('throws when connection is inifinite', (t) => {
t.plan(2)
try {
new Pool(null, { connections: 0 / 0 }) // eslint-disable-line
} catch (e) {
t.type(e, errors.InvalidArgumentError)
t.equal(e.message, 'invalid connections')
}
})
test('throws when connections is negative', (t) => {
t.plan(2)
try {
new Pool(null, { connections: -1 }) // eslint-disable-line no-new
} catch (e) {
t.type(e, errors.InvalidArgumentError)
t.equal(e.message, 'invalid connections')
}
})
test('throws when connection is not number', (t) => {
t.plan(2)
try {
new Pool(null, { connections: true }) // eslint-disable-line no-new
} catch (e) {
t.type(e, errors.InvalidArgumentError)
t.equal(e.message, 'invalid connections')
}
})
test('throws when factory is not a function', (t) => {
t.plan(2)
try {
new Pool(null, { factory: '' }) // eslint-disable-line no-new
} catch (e) {
t.type(e, errors.InvalidArgumentError)
t.equal(e.message, 'factory must be a function.')
}
})
test('does not throw when connect is a function', (t) => {
t.plan(1)
t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} }))
})
test('connect/disconnect event(s)', (t) => {
const clients = 2
t.plan(clients * 6)
const server = createServer((req, res) => {
res.writeHead(200, {
Connection: 'keep-alive',
'Keep-Alive': 'timeout=1s'
})
res.end('ok')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const pool = new Pool(`http://localhost:${server.address().port}`, {
connections: clients,
keepAliveTimeoutThreshold: 100
})
t.teardown(pool.close.bind(pool))
pool.on('connect', (origin, [pool, client]) => {
t.equal(client instanceof Client, true)
})
pool.on('disconnect', (origin, [pool, client], error) => {
t.ok(client instanceof Client)
t.type(error, errors.InformationalError)
t.equal(error.code, 'UND_ERR_INFO')
t.equal(error.message, 'socket idle timeout')
})
for (let i = 0; i < clients; i++) {
pool.request({
path: '/',
method: 'GET'
}, (err, { headers, body }) => {
t.error(err)
body.resume()
})
}
})
})
test('basic get', (t) => {
t.plan(14)
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
t.equal(client[kUrl].origin, `http://localhost:${server.address().port}`)
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
t.equal(client.destroyed, false)
t.equal(client.closed, false)
client.close((err) => {
t.error(err)
t.equal(client.destroyed, true)
client.destroy((err) => {
t.error(err)
client.close((err) => {
t.type(err, errors.ClientDestroyedError)
})
})
})
t.equal(client.closed, true)
})
})
test('URL as arg', (t) => {
t.plan(9)
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const url = new URL('http://localhost')
url.port = server.address().port
const client = new Pool(url)
t.teardown(client.destroy.bind(client))
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
client.close((err) => {
t.error(err)
client.destroy((err) => {
t.error(err)
client.close((err) => {
t.type(err, errors.ClientDestroyedError)
})
})
})
})
})
test('basic get error async/await', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.destroy()
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
await client.request({ path: '/', method: 'GET' })
.catch((err) => {
t.ok(err)
})
await client.destroy()
await client.close().catch((err) => {
t.type(err, errors.ClientDestroyedError)
})
})
})
test('basic get with async/await', async (t) => {
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
await promisify(server.listen.bind(server))(0)
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
body.resume()
await promisify(finished)(body)
await client.close()
await client.destroy()
})
test('stream get async/await', async (t) => {
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
await promisify(server.listen.bind(server))(0)
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
return new PassThrough()
})
})
test('stream get error async/await', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.destroy()
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
await client.stream({ path: '/', method: 'GET' }, () => {
})
.catch((err) => {
t.ok(err)
})
})
})
test('pipeline get', (t) => {
t.plan(5)
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.destroy.bind(client))
const bufs = []
client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
return body
})
.end()
.on('data', (buf) => {
bufs.push(buf)
})
.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
})
test('backpressure algorithm', (t) => {
const seen = []
let total = 0
let writeMore = true
class FakeClient extends EventEmitter {
constructor () {
super()
this.id = total++
}
dispatch (req, handler) {
seen.push({ req, client: this, id: this.id })
return writeMore
}
}
const Pool = proxyquire('../lib/pool', {
'./client': FakeClient
})
const noopHandler = {
onError (err) {
throw err
}
}
const pool = new Pool('http://notahost')
pool.dispatch({}, noopHandler)
pool.dispatch({}, noopHandler)
const d1 = seen.shift() // d1 = c0
t.equal(d1.id, 0)
const d2 = seen.shift() // d2 = c0
t.equal(d2.id, 0)
t.equal(d1.id, d2.id)
writeMore = false
pool.dispatch({}, noopHandler) // d3 = c0
pool.dispatch({}, noopHandler) // d4 = c1
const d3 = seen.shift()
t.equal(d3.id, 0)
const d4 = seen.shift()
t.equal(d4.id, 1)
t.equal(d3.id, d2.id)
t.not(d3.id, d4.id)
writeMore = true
d4.client.emit('drain', new URL('http://notahost'), [])
pool.dispatch({}, noopHandler) // d5 = c1
d3.client.emit('drain', new URL('http://notahost'), [])
pool.dispatch({}, noopHandler) // d6 = c0
const d5 = seen.shift()
t.equal(d5.id, 1)
const d6 = seen.shift()
t.equal(d6.id, 0)
t.equal(d5.id, d4.id)
t.equal(d3.id, d6.id)
t.equal(total, 3)
t.end()
})
test('busy', (t) => {
t.plan(8 * 16 + 2 + 1)
const server = createServer((req, res) => {
t.equal('/', req.url)
t.equal('GET', req.method)
res.setHeader('content-type', 'text/plain')
res.end('hello')
})
t.teardown(server.close.bind(server))
const connections = 2
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections,
pipelining: 2
})
client.on('drain', () => {
t.pass()
})
client.on('connect', () => {
t.pass()
})
t.teardown(client.destroy.bind(client))
for (let n = 1; n <= 8; ++n) {
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
t.error(err)
t.equal(statusCode, 200)
t.equal(headers['content-type'], 'text/plain')
const bufs = []
body.on('data', (buf) => {
bufs.push(buf)
})
body.on('end', () => {
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
})
})
t.equal(client[kPending], n)
t.equal(client[kBusy], n > 1)
t.equal(client[kSize], n)
t.equal(client[kRunning], 0)
t.equal(client.stats.connected, 0)
t.equal(client.stats.free, 0)
t.equal(client.stats.queued, Math.max(n - connections, 0))
t.equal(client.stats.pending, n)
t.equal(client.stats.size, n)
t.equal(client.stats.running, 0)
}
})
})
test('invalid pool dispatch options', (t) => {
t.plan(2)
const pool = new Pool('http://notahost')
t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler')
t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler')
})
test('pool upgrade promise', (t) => {
t.plan(2)
const server = net.createServer((c) => {
c.on('data', (d) => {
c.write('HTTP/1.1 101\r\n')
c.write('hello: world\r\n')
c.write('connection: upgrade\r\n')
c.write('upgrade: websocket\r\n')
c.write('\r\n')
c.write('Body')
})
c.on('end', () => {
c.end()
})
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))
const { headers, socket } = await client.upgrade({
path: '/',
method: 'GET',
protocol: 'Websocket'
})
let recvData = ''
socket.on('data', (d) => {
recvData += d
})
socket.on('close', () => {
t.equal(recvData.toString(), 'Body')
})
t.same(headers, {
hello: 'world',
connection: 'upgrade',
upgrade: 'websocket'
})
socket.end()
})
})
test('pool connect', (t) => {
t.plan(1)
const server = createServer((c) => {
t.fail()
})
server.on('connect', (req, socket, firstBodyChunk) => {
socket.write('HTTP/1.1 200 Connection established\r\n\r\n')
let data = firstBodyChunk.toString()
socket.on('data', (buf) => {
data += buf.toString()
})
socket.on('end', () => {
socket.end(data)
})
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))
const { socket } = await client.connect({
path: '/'
})
let recvData = ''
socket.on('data', (d) => {
recvData += d
})
socket.on('end', () => {
t.equal(recvData.toString(), 'Body')
})
socket.write('Body')
socket.end()
})
})
test('pool dispatch', (t) => {
t.plan(2)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))
let buf = ''
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.equal(statusCode, 200)
},
onData (chunk) {
buf += chunk
},
onComplete () {
t.equal(buf, 'asd')
},
onError () {
}
})
})
})
test('pool pipeline args validation', (t) => {
t.plan(2)
const client = new Pool('http://localhost:5000')
const ret = client.pipeline(null, () => {})
ret.on('error', (err) => {
t.ok(/opts/.test(err.message))
t.type(err, errors.InvalidArgumentError)
})
})
test('300 requests succeed', (t) => {
t.plan(300 * 3)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1
})
t.teardown(client.destroy.bind(client))
for (let n = 0; n < 300; ++n) {
client.request({
path: '/',
method: 'GET'
}, (err, data) => {
t.error(err)
data.body.on('data', (chunk) => {
t.equal(chunk.toString(), 'asd')
}).on('end', () => {
t.pass()
})
})
}
})
})
test('pool connect error', (t) => {
t.plan(1)
const server = createServer((c) => {
t.fail()
})
server.on('connect', (req, socket, firstBodyChunk) => {
socket.destroy()
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))
try {
await client.connect({
path: '/'
})
} catch (err) {
t.ok(err)
}
})
})
test('pool upgrade error', (t) => {
t.plan(1)
const server = net.createServer((c) => {
c.on('data', (d) => {
c.write('HTTP/1.1 101\r\n')
c.write('hello: world\r\n')
c.write('connection: upgrade\r\n')
c.write('\r\n')
c.write('Body')
})
c.on('error', () => {
// Whether we get an error, end or close is undefined.
// Ignore error.
})
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`)
t.teardown(client.close.bind(client))
try {
await client.upgrade({
path: '/',
method: 'GET',
protocol: 'Websocket'
})
} catch (err) {
t.ok(err)
}
})
})
test('pool dispatch error', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.equal(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
},
onError () {
}
})
client.dispatch({
path: '/',
method: 'GET',
headers: {
'transfer-encoding': 'fail'
}
}, {
onConnect () {
t.fail()
},
onHeaders (statusCode, headers) {
t.fail()
},
onData (chunk) {
t.fail()
},
onError (err) {
t.equal(err.code, 'UND_ERR_INVALID_ARG')
}
})
})
})
test('pool request abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.equal(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
},
onError () {
}
})
const signal = new EventEmitter()
client.request({
path: '/',
method: 'GET',
signal
}, (err) => {
t.equal(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool stream abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.equal(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
},
onError () {
}
})
const signal = new EventEmitter()
client.stream({
path: '/',
method: 'GET',
signal
}, ({ body }) => body, (err) => {
t.equal(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool pipeline abort in queue', (t) => {
t.plan(3)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
client.dispatch({
path: '/',
method: 'GET'
}, {
onConnect () {
},
onHeaders (statusCode, headers) {
t.equal(statusCode, 200)
},
onData (chunk) {
},
onComplete () {
t.pass()
},
onError () {
}
})
const signal = new EventEmitter()
client.pipeline({
path: '/',
method: 'GET',
signal
}, ({ body }) => body).end().on('error', (err) => {
t.equal(err.code, 'UND_ERR_ABORTED')
})
signal.emit('abort')
})
})
test('pool stream constructor error destroy body', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
{
const body = new Readable({
read () {
}
})
client.stream({
path: '/',
method: 'GET',
body,
headers: {
'transfer-encoding': 'fail'
}
}, () => {
t.fail()
}, (err) => {
t.equal(err.code, 'UND_ERR_INVALID_ARG')
t.equal(body.destroyed, true)
})
}
{
const body = new Readable({
read () {
}
})
client.stream({
path: '/',
method: 'CONNECT',
body
}, () => {
t.fail()
}, (err) => {
t.equal(err.code, 'UND_ERR_INVALID_ARG')
t.equal(body.destroyed, true)
})
}
})
})
test('pool request constructor error destroy body', (t) => {
t.plan(4)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.close.bind(client))
{
const body = new Readable({
read () {
}
})
client.request({
path: '/',
method: 'GET',
body,
headers: {
'transfer-encoding': 'fail'
}
}, (err) => {
t.equal(err.code, 'UND_ERR_INVALID_ARG')
t.equal(body.destroyed, true)
})
}
{
const body = new Readable({
read () {
}
})
client.request({
path: '/',
method: 'CONNECT',
body
}, (err) => {
t.equal(err.code, 'UND_ERR_INVALID_ARG')
t.equal(body.destroyed, true)
})
}
})
})
test('pool close waits for all requests', (t) => {
t.plan(5)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.destroy.bind(client))
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.error(err)
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.error(err)
})
client.close(() => {
t.pass()
})
client.close(() => {
t.pass()
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.type(err, errors.ClientClosedError)
})
})
})
test('pool destroyed', (t) => {
t.plan(1)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.destroy.bind(client))
client.destroy()
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.type(err, errors.ClientDestroyedError)
})
})
})
test('pool destroy fails queued requests', (t) => {
t.plan(6)
const server = createServer((req, res) => {
res.end('asd')
})
t.teardown(server.close.bind(server))
server.listen(0, async () => {
const client = new Pool(`http://localhost:${server.address().port}`, {
connections: 1,
pipelining: 1
})
t.teardown(client.destroy.bind(client))
const _err = new Error()
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.equal(err, _err)
})
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.equal(err, _err)
})
t.equal(client.destroyed, false)
client.destroy(_err, () => {
t.pass()
})
t.equal(client.destroyed, true)
client.request({
path: '/',
method: 'GET'
}, (err) => {
t.type(err, errors.ClientDestroyedError)
})
})
})