mirror of
https://git.proxmox.com/git/package-rebuilds
synced 2025-08-22 06:36:36 +00:00
1102 lines
24 KiB
JavaScript
1102 lines
24 KiB
JavaScript
'use strict'
|
|
|
|
const { EventEmitter } = require('events')
|
|
const { createServer } = require('http')
|
|
const net = require('net')
|
|
const {
|
|
finished,
|
|
PassThrough,
|
|
Readable
|
|
} = require('stream')
|
|
const { promisify } = require('util')
|
|
const proxyquire = require('proxyquire')
|
|
const { test } = require('tap')
|
|
const {
|
|
kBusy,
|
|
kPending,
|
|
kRunning,
|
|
kSize,
|
|
kUrl
|
|
} = require('../lib/core/symbols')
|
|
const {
|
|
Client,
|
|
Pool,
|
|
errors
|
|
} = require('..')
|
|
|
|
test('throws when connection is inifinite', (t) => {
|
|
t.plan(2)
|
|
|
|
try {
|
|
new Pool(null, { connections: 0 / 0 }) // eslint-disable-line
|
|
} catch (e) {
|
|
t.type(e, errors.InvalidArgumentError)
|
|
t.equal(e.message, 'invalid connections')
|
|
}
|
|
})
|
|
|
|
test('throws when connections is negative', (t) => {
|
|
t.plan(2)
|
|
|
|
try {
|
|
new Pool(null, { connections: -1 }) // eslint-disable-line no-new
|
|
} catch (e) {
|
|
t.type(e, errors.InvalidArgumentError)
|
|
t.equal(e.message, 'invalid connections')
|
|
}
|
|
})
|
|
|
|
test('throws when connection is not number', (t) => {
|
|
t.plan(2)
|
|
|
|
try {
|
|
new Pool(null, { connections: true }) // eslint-disable-line no-new
|
|
} catch (e) {
|
|
t.type(e, errors.InvalidArgumentError)
|
|
t.equal(e.message, 'invalid connections')
|
|
}
|
|
})
|
|
|
|
test('throws when factory is not a function', (t) => {
|
|
t.plan(2)
|
|
|
|
try {
|
|
new Pool(null, { factory: '' }) // eslint-disable-line no-new
|
|
} catch (e) {
|
|
t.type(e, errors.InvalidArgumentError)
|
|
t.equal(e.message, 'factory must be a function.')
|
|
}
|
|
})
|
|
|
|
test('does not throw when connect is a function', (t) => {
|
|
t.plan(1)
|
|
|
|
t.doesNotThrow(() => new Pool('http://localhost', { connect: () => {} }))
|
|
})
|
|
|
|
test('connect/disconnect event(s)', (t) => {
|
|
const clients = 2
|
|
|
|
t.plan(clients * 6)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.writeHead(200, {
|
|
Connection: 'keep-alive',
|
|
'Keep-Alive': 'timeout=1s'
|
|
})
|
|
res.end('ok')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, () => {
|
|
const pool = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: clients,
|
|
keepAliveTimeoutThreshold: 100
|
|
})
|
|
t.teardown(pool.close.bind(pool))
|
|
|
|
pool.on('connect', (origin, [pool, client]) => {
|
|
t.equal(client instanceof Client, true)
|
|
})
|
|
pool.on('disconnect', (origin, [pool, client], error) => {
|
|
t.ok(client instanceof Client)
|
|
t.type(error, errors.InformationalError)
|
|
t.equal(error.code, 'UND_ERR_INFO')
|
|
t.equal(error.message, 'socket idle timeout')
|
|
})
|
|
|
|
for (let i = 0; i < clients; i++) {
|
|
pool.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err, { headers, body }) => {
|
|
t.error(err)
|
|
body.resume()
|
|
})
|
|
}
|
|
})
|
|
})
|
|
|
|
test('basic get', (t) => {
|
|
t.plan(14)
|
|
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
t.equal(client[kUrl].origin, `http://localhost:${server.address().port}`)
|
|
|
|
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
|
|
t.error(err)
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
const bufs = []
|
|
body.on('data', (buf) => {
|
|
bufs.push(buf)
|
|
})
|
|
body.on('end', () => {
|
|
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
|
|
})
|
|
})
|
|
|
|
t.equal(client.destroyed, false)
|
|
t.equal(client.closed, false)
|
|
client.close((err) => {
|
|
t.error(err)
|
|
t.equal(client.destroyed, true)
|
|
client.destroy((err) => {
|
|
t.error(err)
|
|
client.close((err) => {
|
|
t.type(err, errors.ClientDestroyedError)
|
|
})
|
|
})
|
|
})
|
|
t.equal(client.closed, true)
|
|
})
|
|
})
|
|
|
|
test('URL as arg', (t) => {
|
|
t.plan(9)
|
|
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const url = new URL('http://localhost')
|
|
url.port = server.address().port
|
|
const client = new Pool(url)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
|
|
t.error(err)
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
const bufs = []
|
|
body.on('data', (buf) => {
|
|
bufs.push(buf)
|
|
})
|
|
body.on('end', () => {
|
|
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
|
|
})
|
|
})
|
|
|
|
client.close((err) => {
|
|
t.error(err)
|
|
client.destroy((err) => {
|
|
t.error(err)
|
|
client.close((err) => {
|
|
t.type(err, errors.ClientDestroyedError)
|
|
})
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
test('basic get error async/await', (t) => {
|
|
t.plan(2)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.destroy()
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
await client.request({ path: '/', method: 'GET' })
|
|
.catch((err) => {
|
|
t.ok(err)
|
|
})
|
|
|
|
await client.destroy()
|
|
|
|
await client.close().catch((err) => {
|
|
t.type(err, errors.ClientDestroyedError)
|
|
})
|
|
})
|
|
})
|
|
|
|
test('basic get with async/await', async (t) => {
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
await promisify(server.listen.bind(server))(0)
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
const { statusCode, headers, body } = await client.request({ path: '/', method: 'GET' })
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
|
|
body.resume()
|
|
await promisify(finished)(body)
|
|
|
|
await client.close()
|
|
await client.destroy()
|
|
})
|
|
|
|
test('stream get async/await', async (t) => {
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
await promisify(server.listen.bind(server))(0)
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
await client.stream({ path: '/', method: 'GET' }, ({ statusCode, headers }) => {
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
return new PassThrough()
|
|
})
|
|
})
|
|
|
|
test('stream get error async/await', (t) => {
|
|
t.plan(1)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.destroy()
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
await client.stream({ path: '/', method: 'GET' }, () => {
|
|
|
|
})
|
|
.catch((err) => {
|
|
t.ok(err)
|
|
})
|
|
})
|
|
})
|
|
|
|
test('pipeline get', (t) => {
|
|
t.plan(5)
|
|
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
const bufs = []
|
|
client.pipeline({ path: '/', method: 'GET' }, ({ statusCode, headers, body }) => {
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
return body
|
|
})
|
|
.end()
|
|
.on('data', (buf) => {
|
|
bufs.push(buf)
|
|
})
|
|
.on('end', () => {
|
|
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
|
|
})
|
|
})
|
|
})
|
|
|
|
test('backpressure algorithm', (t) => {
|
|
const seen = []
|
|
let total = 0
|
|
|
|
let writeMore = true
|
|
|
|
class FakeClient extends EventEmitter {
|
|
constructor () {
|
|
super()
|
|
|
|
this.id = total++
|
|
}
|
|
|
|
dispatch (req, handler) {
|
|
seen.push({ req, client: this, id: this.id })
|
|
return writeMore
|
|
}
|
|
}
|
|
|
|
const Pool = proxyquire('../lib/pool', {
|
|
'./client': FakeClient
|
|
})
|
|
|
|
const noopHandler = {
|
|
onError (err) {
|
|
throw err
|
|
}
|
|
}
|
|
|
|
const pool = new Pool('http://notahost')
|
|
|
|
pool.dispatch({}, noopHandler)
|
|
pool.dispatch({}, noopHandler)
|
|
|
|
const d1 = seen.shift() // d1 = c0
|
|
t.equal(d1.id, 0)
|
|
const d2 = seen.shift() // d2 = c0
|
|
t.equal(d2.id, 0)
|
|
|
|
t.equal(d1.id, d2.id)
|
|
|
|
writeMore = false
|
|
|
|
pool.dispatch({}, noopHandler) // d3 = c0
|
|
|
|
pool.dispatch({}, noopHandler) // d4 = c1
|
|
|
|
const d3 = seen.shift()
|
|
t.equal(d3.id, 0)
|
|
const d4 = seen.shift()
|
|
t.equal(d4.id, 1)
|
|
|
|
t.equal(d3.id, d2.id)
|
|
t.not(d3.id, d4.id)
|
|
|
|
writeMore = true
|
|
|
|
d4.client.emit('drain', new URL('http://notahost'), [])
|
|
|
|
pool.dispatch({}, noopHandler) // d5 = c1
|
|
|
|
d3.client.emit('drain', new URL('http://notahost'), [])
|
|
|
|
pool.dispatch({}, noopHandler) // d6 = c0
|
|
|
|
const d5 = seen.shift()
|
|
t.equal(d5.id, 1)
|
|
const d6 = seen.shift()
|
|
t.equal(d6.id, 0)
|
|
|
|
t.equal(d5.id, d4.id)
|
|
t.equal(d3.id, d6.id)
|
|
|
|
t.equal(total, 3)
|
|
|
|
t.end()
|
|
})
|
|
|
|
test('busy', (t) => {
|
|
t.plan(8 * 16 + 2 + 1)
|
|
|
|
const server = createServer((req, res) => {
|
|
t.equal('/', req.url)
|
|
t.equal('GET', req.method)
|
|
res.setHeader('content-type', 'text/plain')
|
|
res.end('hello')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
const connections = 2
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections,
|
|
pipelining: 2
|
|
})
|
|
client.on('drain', () => {
|
|
t.pass()
|
|
})
|
|
client.on('connect', () => {
|
|
t.pass()
|
|
})
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
for (let n = 1; n <= 8; ++n) {
|
|
client.request({ path: '/', method: 'GET' }, (err, { statusCode, headers, body }) => {
|
|
t.error(err)
|
|
t.equal(statusCode, 200)
|
|
t.equal(headers['content-type'], 'text/plain')
|
|
const bufs = []
|
|
body.on('data', (buf) => {
|
|
bufs.push(buf)
|
|
})
|
|
body.on('end', () => {
|
|
t.equal('hello', Buffer.concat(bufs).toString('utf8'))
|
|
})
|
|
})
|
|
t.equal(client[kPending], n)
|
|
t.equal(client[kBusy], n > 1)
|
|
t.equal(client[kSize], n)
|
|
t.equal(client[kRunning], 0)
|
|
|
|
t.equal(client.stats.connected, 0)
|
|
t.equal(client.stats.free, 0)
|
|
t.equal(client.stats.queued, Math.max(n - connections, 0))
|
|
t.equal(client.stats.pending, n)
|
|
t.equal(client.stats.size, n)
|
|
t.equal(client.stats.running, 0)
|
|
}
|
|
})
|
|
})
|
|
|
|
test('invalid pool dispatch options', (t) => {
|
|
t.plan(2)
|
|
const pool = new Pool('http://notahost')
|
|
t.throws(() => pool.dispatch({}), errors.InvalidArgumentError, 'throws on invalid handler')
|
|
t.throws(() => pool.dispatch({}, {}), errors.InvalidArgumentError, 'throws on invalid handler')
|
|
})
|
|
|
|
test('pool upgrade promise', (t) => {
|
|
t.plan(2)
|
|
|
|
const server = net.createServer((c) => {
|
|
c.on('data', (d) => {
|
|
c.write('HTTP/1.1 101\r\n')
|
|
c.write('hello: world\r\n')
|
|
c.write('connection: upgrade\r\n')
|
|
c.write('upgrade: websocket\r\n')
|
|
c.write('\r\n')
|
|
c.write('Body')
|
|
})
|
|
|
|
c.on('end', () => {
|
|
c.end()
|
|
})
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.close.bind(client))
|
|
|
|
const { headers, socket } = await client.upgrade({
|
|
path: '/',
|
|
method: 'GET',
|
|
protocol: 'Websocket'
|
|
})
|
|
|
|
let recvData = ''
|
|
socket.on('data', (d) => {
|
|
recvData += d
|
|
})
|
|
|
|
socket.on('close', () => {
|
|
t.equal(recvData.toString(), 'Body')
|
|
})
|
|
|
|
t.same(headers, {
|
|
hello: 'world',
|
|
connection: 'upgrade',
|
|
upgrade: 'websocket'
|
|
})
|
|
socket.end()
|
|
})
|
|
})
|
|
|
|
test('pool connect', (t) => {
|
|
t.plan(1)
|
|
|
|
const server = createServer((c) => {
|
|
t.fail()
|
|
})
|
|
server.on('connect', (req, socket, firstBodyChunk) => {
|
|
socket.write('HTTP/1.1 200 Connection established\r\n\r\n')
|
|
|
|
let data = firstBodyChunk.toString()
|
|
socket.on('data', (buf) => {
|
|
data += buf.toString()
|
|
})
|
|
|
|
socket.on('end', () => {
|
|
socket.end(data)
|
|
})
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.close.bind(client))
|
|
|
|
const { socket } = await client.connect({
|
|
path: '/'
|
|
})
|
|
|
|
let recvData = ''
|
|
socket.on('data', (d) => {
|
|
recvData += d
|
|
})
|
|
|
|
socket.on('end', () => {
|
|
t.equal(recvData.toString(), 'Body')
|
|
})
|
|
|
|
socket.write('Body')
|
|
socket.end()
|
|
})
|
|
})
|
|
|
|
test('pool dispatch', (t) => {
|
|
t.plan(2)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.close.bind(client))
|
|
|
|
let buf = ''
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, {
|
|
onConnect () {
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.equal(statusCode, 200)
|
|
},
|
|
onData (chunk) {
|
|
buf += chunk
|
|
},
|
|
onComplete () {
|
|
t.equal(buf, 'asd')
|
|
},
|
|
onError () {
|
|
}
|
|
})
|
|
})
|
|
})
|
|
|
|
test('pool pipeline args validation', (t) => {
|
|
t.plan(2)
|
|
|
|
const client = new Pool('http://localhost:5000')
|
|
|
|
const ret = client.pipeline(null, () => {})
|
|
ret.on('error', (err) => {
|
|
t.ok(/opts/.test(err.message))
|
|
t.type(err, errors.InvalidArgumentError)
|
|
})
|
|
})
|
|
|
|
test('300 requests succeed', (t) => {
|
|
t.plan(300 * 3)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1
|
|
})
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
for (let n = 0; n < 300; ++n) {
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err, data) => {
|
|
t.error(err)
|
|
data.body.on('data', (chunk) => {
|
|
t.equal(chunk.toString(), 'asd')
|
|
}).on('end', () => {
|
|
t.pass()
|
|
})
|
|
})
|
|
}
|
|
})
|
|
})
|
|
|
|
test('pool connect error', (t) => {
|
|
t.plan(1)
|
|
|
|
const server = createServer((c) => {
|
|
t.fail()
|
|
})
|
|
server.on('connect', (req, socket, firstBodyChunk) => {
|
|
socket.destroy()
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.close.bind(client))
|
|
|
|
try {
|
|
await client.connect({
|
|
path: '/'
|
|
})
|
|
} catch (err) {
|
|
t.ok(err)
|
|
}
|
|
})
|
|
})
|
|
|
|
test('pool upgrade error', (t) => {
|
|
t.plan(1)
|
|
|
|
const server = net.createServer((c) => {
|
|
c.on('data', (d) => {
|
|
c.write('HTTP/1.1 101\r\n')
|
|
c.write('hello: world\r\n')
|
|
c.write('connection: upgrade\r\n')
|
|
c.write('\r\n')
|
|
c.write('Body')
|
|
})
|
|
c.on('error', () => {
|
|
// Whether we get an error, end or close is undefined.
|
|
// Ignore error.
|
|
})
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`)
|
|
t.teardown(client.close.bind(client))
|
|
|
|
try {
|
|
await client.upgrade({
|
|
path: '/',
|
|
method: 'GET',
|
|
protocol: 'Websocket'
|
|
})
|
|
} catch (err) {
|
|
t.ok(err)
|
|
}
|
|
})
|
|
})
|
|
|
|
test('pool dispatch error', (t) => {
|
|
t.plan(3)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, {
|
|
onConnect () {
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.equal(statusCode, 200)
|
|
},
|
|
onData (chunk) {
|
|
},
|
|
onComplete () {
|
|
t.pass()
|
|
},
|
|
onError () {
|
|
}
|
|
})
|
|
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET',
|
|
headers: {
|
|
'transfer-encoding': 'fail'
|
|
}
|
|
}, {
|
|
onConnect () {
|
|
t.fail()
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.fail()
|
|
},
|
|
onData (chunk) {
|
|
t.fail()
|
|
},
|
|
onError (err) {
|
|
t.equal(err.code, 'UND_ERR_INVALID_ARG')
|
|
}
|
|
})
|
|
})
|
|
})
|
|
|
|
test('pool request abort in queue', (t) => {
|
|
t.plan(3)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, {
|
|
onConnect () {
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.equal(statusCode, 200)
|
|
},
|
|
onData (chunk) {
|
|
},
|
|
onComplete () {
|
|
t.pass()
|
|
},
|
|
onError () {
|
|
}
|
|
})
|
|
|
|
const signal = new EventEmitter()
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET',
|
|
signal
|
|
}, (err) => {
|
|
t.equal(err.code, 'UND_ERR_ABORTED')
|
|
})
|
|
signal.emit('abort')
|
|
})
|
|
})
|
|
|
|
test('pool stream abort in queue', (t) => {
|
|
t.plan(3)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, {
|
|
onConnect () {
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.equal(statusCode, 200)
|
|
},
|
|
onData (chunk) {
|
|
},
|
|
onComplete () {
|
|
t.pass()
|
|
},
|
|
onError () {
|
|
}
|
|
})
|
|
|
|
const signal = new EventEmitter()
|
|
client.stream({
|
|
path: '/',
|
|
method: 'GET',
|
|
signal
|
|
}, ({ body }) => body, (err) => {
|
|
t.equal(err.code, 'UND_ERR_ABORTED')
|
|
})
|
|
signal.emit('abort')
|
|
})
|
|
})
|
|
|
|
test('pool pipeline abort in queue', (t) => {
|
|
t.plan(3)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
client.dispatch({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, {
|
|
onConnect () {
|
|
},
|
|
onHeaders (statusCode, headers) {
|
|
t.equal(statusCode, 200)
|
|
},
|
|
onData (chunk) {
|
|
},
|
|
onComplete () {
|
|
t.pass()
|
|
},
|
|
onError () {
|
|
}
|
|
})
|
|
|
|
const signal = new EventEmitter()
|
|
client.pipeline({
|
|
path: '/',
|
|
method: 'GET',
|
|
signal
|
|
}, ({ body }) => body).end().on('error', (err) => {
|
|
t.equal(err.code, 'UND_ERR_ABORTED')
|
|
})
|
|
signal.emit('abort')
|
|
})
|
|
})
|
|
|
|
test('pool stream constructor error destroy body', (t) => {
|
|
t.plan(4)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
{
|
|
const body = new Readable({
|
|
read () {
|
|
}
|
|
})
|
|
client.stream({
|
|
path: '/',
|
|
method: 'GET',
|
|
body,
|
|
headers: {
|
|
'transfer-encoding': 'fail'
|
|
}
|
|
}, () => {
|
|
t.fail()
|
|
}, (err) => {
|
|
t.equal(err.code, 'UND_ERR_INVALID_ARG')
|
|
t.equal(body.destroyed, true)
|
|
})
|
|
}
|
|
|
|
{
|
|
const body = new Readable({
|
|
read () {
|
|
}
|
|
})
|
|
client.stream({
|
|
path: '/',
|
|
method: 'CONNECT',
|
|
body
|
|
}, () => {
|
|
t.fail()
|
|
}, (err) => {
|
|
t.equal(err.code, 'UND_ERR_INVALID_ARG')
|
|
t.equal(body.destroyed, true)
|
|
})
|
|
}
|
|
})
|
|
})
|
|
|
|
test('pool request constructor error destroy body', (t) => {
|
|
t.plan(4)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.close.bind(client))
|
|
|
|
{
|
|
const body = new Readable({
|
|
read () {
|
|
}
|
|
})
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET',
|
|
body,
|
|
headers: {
|
|
'transfer-encoding': 'fail'
|
|
}
|
|
}, (err) => {
|
|
t.equal(err.code, 'UND_ERR_INVALID_ARG')
|
|
t.equal(body.destroyed, true)
|
|
})
|
|
}
|
|
|
|
{
|
|
const body = new Readable({
|
|
read () {
|
|
}
|
|
})
|
|
client.request({
|
|
path: '/',
|
|
method: 'CONNECT',
|
|
body
|
|
}, (err) => {
|
|
t.equal(err.code, 'UND_ERR_INVALID_ARG')
|
|
t.equal(body.destroyed, true)
|
|
})
|
|
}
|
|
})
|
|
})
|
|
|
|
test('pool close waits for all requests', (t) => {
|
|
t.plan(5)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.error(err)
|
|
})
|
|
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.error(err)
|
|
})
|
|
|
|
client.close(() => {
|
|
t.pass()
|
|
})
|
|
|
|
client.close(() => {
|
|
t.pass()
|
|
})
|
|
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.type(err, errors.ClientClosedError)
|
|
})
|
|
})
|
|
})
|
|
|
|
test('pool destroyed', (t) => {
|
|
t.plan(1)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
client.destroy()
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.type(err, errors.ClientDestroyedError)
|
|
})
|
|
})
|
|
})
|
|
|
|
test('pool destroy fails queued requests', (t) => {
|
|
t.plan(6)
|
|
|
|
const server = createServer((req, res) => {
|
|
res.end('asd')
|
|
})
|
|
t.teardown(server.close.bind(server))
|
|
|
|
server.listen(0, async () => {
|
|
const client = new Pool(`http://localhost:${server.address().port}`, {
|
|
connections: 1,
|
|
pipelining: 1
|
|
})
|
|
t.teardown(client.destroy.bind(client))
|
|
|
|
const _err = new Error()
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.equal(err, _err)
|
|
})
|
|
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.equal(err, _err)
|
|
})
|
|
|
|
t.equal(client.destroyed, false)
|
|
client.destroy(_err, () => {
|
|
t.pass()
|
|
})
|
|
t.equal(client.destroyed, true)
|
|
|
|
client.request({
|
|
path: '/',
|
|
method: 'GET'
|
|
}, (err) => {
|
|
t.type(err, errors.ClientDestroyedError)
|
|
})
|
|
})
|
|
})
|