mirror of
https://github.com/nodejs/node.git
synced 2025-05-10 17:57:53 +00:00
src: do not reuse async resource in http parsers
Change resource being used, previously HTTParser was being reused. We are now using IncomingMessage and ClientRequest objects. The goal here is to make the async resource unique for each async operatio Refs: https://github.com/nodejs/node/pull/24330 Refs: https://github.com/nodejs/diagnostics/issues/248 Refs: https://github.com/nodejs/node/pull/21313 Co-authored-by: Matheus Marchini <mat@mmarchini.me> PR-URL: https://github.com/nodejs/node/pull/25094 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benedikt Meurer <benedikt.meurer@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
parent
5aaf666b3b
commit
ece507394a
@ -236,10 +236,10 @@ The `type` is a string identifying the type of resource that caused
|
||||
resource's constructor.
|
||||
|
||||
```text
|
||||
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
|
||||
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
|
||||
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP, TTYWRAP,
|
||||
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
|
||||
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPINCOMINGMESSAGE,
|
||||
HTTPCLIENTREQUEST, JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP,
|
||||
SHUTDOWNWRAP, SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP,
|
||||
TTYWRAP, UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
|
||||
RANDOMBYTESREQUEST, TLSWRAP, Microtask, Timeout, Immediate, TickObject
|
||||
```
|
||||
|
||||
|
@ -632,11 +632,10 @@ function emitFreeNT(socket) {
|
||||
}
|
||||
|
||||
function tickOnSocket(req, socket) {
|
||||
const isParserReused = parsers.hasItems();
|
||||
const parser = parsers.alloc();
|
||||
req.socket = socket;
|
||||
req.connection = socket;
|
||||
parser.reinitialize(HTTPParser.RESPONSE, isParserReused);
|
||||
parser.initialize(HTTPParser.RESPONSE, req);
|
||||
parser.socket = socket;
|
||||
parser.outgoing = req;
|
||||
req.parser = parser;
|
||||
|
@ -126,6 +126,12 @@ const STATUS_CODES = {
|
||||
|
||||
const kOnExecute = HTTPParser.kOnExecute | 0;
|
||||
|
||||
class HTTPServerAsyncResource {
|
||||
constructor(type, socket) {
|
||||
this.type = type;
|
||||
this.socket = socket;
|
||||
}
|
||||
}
|
||||
|
||||
function ServerResponse(req) {
|
||||
OutgoingMessage.call(this);
|
||||
@ -349,9 +355,15 @@ function connectionListenerInternal(server, socket) {
|
||||
socket.setTimeout(server.timeout);
|
||||
socket.on('timeout', socketOnTimeout);
|
||||
|
||||
const isParserReused = parsers.hasItems();
|
||||
const parser = parsers.alloc();
|
||||
parser.reinitialize(HTTPParser.REQUEST, isParserReused);
|
||||
|
||||
// TODO(addaleax): This doesn't play well with the
|
||||
// `async_hooks.currentResource()` proposal, see
|
||||
// https://github.com/nodejs/node/pull/21313
|
||||
parser.initialize(
|
||||
HTTPParser.REQUEST,
|
||||
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket)
|
||||
);
|
||||
parser.socket = socket;
|
||||
|
||||
// We are starting to wait for our headers.
|
||||
|
@ -34,6 +34,11 @@ inline AsyncWrap::ProviderType AsyncWrap::provider_type() const {
|
||||
return provider_type_;
|
||||
}
|
||||
|
||||
inline AsyncWrap::ProviderType AsyncWrap::set_provider_type(
|
||||
AsyncWrap::ProviderType provider) {
|
||||
provider_type_ = provider;
|
||||
return provider_type_;
|
||||
}
|
||||
|
||||
inline double AsyncWrap::get_async_id() const {
|
||||
return async_id_;
|
||||
|
@ -602,11 +602,15 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
|
||||
env->destroy_async_id_list()->push_back(async_id);
|
||||
}
|
||||
|
||||
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
|
||||
AsyncReset(object(), execution_async_id, silent);
|
||||
}
|
||||
|
||||
// Generalized call for both the constructor and for handles that are pooled
|
||||
// and reused over their lifetime. This way a new uid can be assigned when
|
||||
// the resource is pulled out of the pool and put back into use.
|
||||
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
|
||||
void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
|
||||
bool silent) {
|
||||
if (async_id_ != -1) {
|
||||
// This instance was in use before, we have already emitted an init with
|
||||
// its previous async_id and need to emit a matching destroy for that
|
||||
@ -643,7 +647,7 @@ void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
|
||||
|
||||
if (silent) return;
|
||||
|
||||
EmitAsyncInit(env(), object(),
|
||||
EmitAsyncInit(env(), resource,
|
||||
env()->async_hooks()->provider_string(provider_type()),
|
||||
async_id_, trigger_async_id_);
|
||||
}
|
||||
|
@ -46,7 +46,8 @@ namespace node {
|
||||
V(HTTP2STREAM) \
|
||||
V(HTTP2PING) \
|
||||
V(HTTP2SETTINGS) \
|
||||
V(HTTPPARSER) \
|
||||
V(HTTPINCOMINGMESSAGE) \
|
||||
V(HTTPCLIENTREQUEST) \
|
||||
V(JSSTREAM) \
|
||||
V(MESSAGEPORT) \
|
||||
V(PIPECONNECTWRAP) \
|
||||
@ -147,11 +148,16 @@ class AsyncWrap : public BaseObject {
|
||||
static void DestroyAsyncIdsCallback(Environment* env, void* data);
|
||||
|
||||
inline ProviderType provider_type() const;
|
||||
inline ProviderType set_provider_type(ProviderType provider);
|
||||
|
||||
inline double get_async_id() const;
|
||||
|
||||
inline double get_trigger_async_id() const;
|
||||
|
||||
void AsyncReset(v8::Local<v8::Object> resource,
|
||||
double execution_async_id = -1,
|
||||
bool silent = false);
|
||||
|
||||
void AsyncReset(double execution_async_id = -1, bool silent = false);
|
||||
|
||||
// Only call these within a valid HandleScope.
|
||||
@ -202,7 +208,7 @@ class AsyncWrap : public BaseObject {
|
||||
ProviderType provider,
|
||||
double execution_async_id,
|
||||
bool silent);
|
||||
const ProviderType provider_type_;
|
||||
ProviderType provider_type_;
|
||||
// Because the values may be Reset(), cannot be made const.
|
||||
double async_id_ = -1;
|
||||
double trigger_async_id_;
|
||||
|
@ -152,11 +152,13 @@ struct StringPtr {
|
||||
size_t size_;
|
||||
};
|
||||
|
||||
|
||||
class Parser : public AsyncWrap, public StreamListener {
|
||||
public:
|
||||
Parser(Environment* env, Local<Object> wrap, parser_type_t type)
|
||||
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER),
|
||||
: AsyncWrap(env, wrap,
|
||||
type == HTTP_REQUEST ?
|
||||
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE :
|
||||
AsyncWrap::PROVIDER_HTTPCLIENTREQUEST),
|
||||
current_buffer_len_(0),
|
||||
current_buffer_data_(nullptr) {
|
||||
Init(type);
|
||||
@ -503,12 +505,12 @@ class Parser : public AsyncWrap, public StreamListener {
|
||||
}
|
||||
|
||||
|
||||
static void Reinitialize(const FunctionCallbackInfo<Value>& args) {
|
||||
static void Initialize(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
|
||||
CHECK(args[0]->IsInt32());
|
||||
CHECK(args[1]->IsBoolean());
|
||||
bool isReused = args[1]->IsTrue();
|
||||
CHECK(args[1]->IsObject());
|
||||
|
||||
parser_type_t type =
|
||||
static_cast<parser_type_t>(args[0].As<Int32>()->Value());
|
||||
|
||||
@ -517,16 +519,16 @@ class Parser : public AsyncWrap, public StreamListener {
|
||||
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
|
||||
// Should always be called from the same context.
|
||||
CHECK_EQ(env, parser->env());
|
||||
// This parser has either just been created or it is being reused.
|
||||
// We must only call AsyncReset for the latter case, because AsyncReset has
|
||||
// already been called via the constructor for the former case.
|
||||
if (isReused) {
|
||||
parser->AsyncReset();
|
||||
}
|
||||
|
||||
AsyncWrap::ProviderType provider =
|
||||
(type == HTTP_REQUEST ?
|
||||
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE
|
||||
: AsyncWrap::PROVIDER_HTTPCLIENTREQUEST);
|
||||
|
||||
parser->set_provider_type(provider);
|
||||
parser->Init(type);
|
||||
}
|
||||
|
||||
|
||||
template <bool should_pause>
|
||||
static void Pause(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
@ -958,7 +960,7 @@ void InitializeHttpParser(Local<Object> target,
|
||||
env->SetProtoMethod(t, "free", Parser::Free);
|
||||
env->SetProtoMethod(t, "execute", Parser::Execute);
|
||||
env->SetProtoMethod(t, "finish", Parser::Finish);
|
||||
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
|
||||
env->SetProtoMethod(t, "initialize", Parser::Initialize);
|
||||
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
|
||||
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
|
||||
env->SetProtoMethod(t, "consume", Parser::Consume);
|
||||
|
39
test/async-hooks/test-httparser-reuse.js
Normal file
39
test/async-hooks/test-httparser-reuse.js
Normal file
@ -0,0 +1,39 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const http = require('http');
|
||||
const assert = require('assert');
|
||||
const { createHook } = require('async_hooks');
|
||||
const reused = Symbol('reused');
|
||||
|
||||
let reusedHTTPParser = false;
|
||||
const asyncHook = createHook({
|
||||
init(asyncId, type, triggerAsyncId, resource) {
|
||||
if (resource[reused]) {
|
||||
reusedHTTPParser = true;
|
||||
}
|
||||
resource[reused] = true;
|
||||
}
|
||||
});
|
||||
asyncHook.enable();
|
||||
|
||||
const server = http.createServer(function(req, res) {
|
||||
res.end();
|
||||
});
|
||||
|
||||
const PORT = 3000;
|
||||
const url = 'http://127.0.0.1:' + PORT;
|
||||
|
||||
server.listen(PORT, common.mustCall(() => {
|
||||
http.get(url, common.mustCall(() => {
|
||||
server.close(common.mustCall(() => {
|
||||
server.listen(PORT, common.mustCall(() => {
|
||||
http.get(url, common.mustCall(() => {
|
||||
server.close(common.mustCall(() => {
|
||||
assert.strictEqual(reusedHTTPParser, false);
|
||||
}));
|
||||
}));
|
||||
}));
|
||||
}));
|
||||
}));
|
||||
}));
|
@ -21,7 +21,7 @@ const request = Buffer.from(
|
||||
);
|
||||
|
||||
const parser = new HTTPParser(REQUEST);
|
||||
const as = hooks.activitiesOfTypes('HTTPPARSER');
|
||||
const as = hooks.activitiesOfTypes('HTTPINCOMINGMESSAGE');
|
||||
const httpparser = as[0];
|
||||
|
||||
assert.strictEqual(as.length, 1);
|
||||
@ -47,7 +47,7 @@ process.on('exit', onexit);
|
||||
|
||||
function onexit() {
|
||||
hooks.disable();
|
||||
hooks.sanityCheck('HTTPPARSER');
|
||||
hooks.sanityCheck('HTTPINCOMINGMESSAGE');
|
||||
checkInvocations(httpparser, { init: 1, before: 1, after: 1, destroy: 1 },
|
||||
'when process exits');
|
||||
}
|
||||
|
@ -26,7 +26,7 @@ const request = Buffer.from(
|
||||
);
|
||||
|
||||
const parser = new HTTPParser(RESPONSE);
|
||||
const as = hooks.activitiesOfTypes('HTTPPARSER');
|
||||
const as = hooks.activitiesOfTypes('HTTPCLIENTREQUEST');
|
||||
const httpparser = as[0];
|
||||
|
||||
assert.strictEqual(as.length, 1);
|
||||
@ -58,7 +58,7 @@ process.on('exit', onexit);
|
||||
|
||||
function onexit() {
|
||||
hooks.disable();
|
||||
hooks.sanityCheck('HTTPPARSER');
|
||||
hooks.sanityCheck('HTTPCLIENTREQUEST');
|
||||
checkInvocations(httpparser, { init: 1, before: 2, after: 2, destroy: 1 },
|
||||
'when process exits');
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ function expectBody(expected) {
|
||||
throw new Error('hello world');
|
||||
};
|
||||
|
||||
parser.reinitialize(HTTPParser.REQUEST, false);
|
||||
parser.initialize(HTTPParser.REQUEST, request);
|
||||
|
||||
assert.throws(
|
||||
() => { parser.execute(request, 0, request.length); },
|
||||
@ -555,7 +555,7 @@ function expectBody(expected) {
|
||||
parser[kOnBody] = expectBody('ping');
|
||||
parser.execute(req1, 0, req1.length);
|
||||
|
||||
parser.reinitialize(REQUEST, false);
|
||||
parser.initialize(REQUEST, req2);
|
||||
parser[kOnBody] = expectBody('pong');
|
||||
parser[kOnHeadersComplete] = onHeadersComplete2;
|
||||
parser.execute(req2, 0, req2.length);
|
||||
|
@ -48,6 +48,8 @@ const { getSystemErrorName } = require('util');
|
||||
if (!common.isMainThread)
|
||||
delete providers.INSPECTORJSBINDING;
|
||||
delete providers.KEYPAIRGENREQUEST;
|
||||
delete providers.HTTPCLIENTREQUEST;
|
||||
delete providers.HTTPINCOMINGMESSAGE;
|
||||
|
||||
const objKeys = Object.keys(providers);
|
||||
if (objKeys.length > 0)
|
||||
|
@ -7,6 +7,7 @@ const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const httpCommon = require('_http_common');
|
||||
const { HTTPParser } = require('_http_common');
|
||||
const { AsyncResource } = require('async_hooks');
|
||||
const net = require('net');
|
||||
|
||||
const COUNT = httpCommon.parsers.max + 1;
|
||||
@ -24,7 +25,7 @@ function execAndClose() {
|
||||
process.stdout.write('.');
|
||||
|
||||
const parser = parsers.pop();
|
||||
parser.reinitialize(HTTPParser.RESPONSE, !!parser.reused);
|
||||
parser.initialize(HTTPParser.RESPONSE, new AsyncResource('ClientRequest'));
|
||||
|
||||
const socket = net.connect(common.PORT);
|
||||
socket.on('error', (e) => {
|
||||
|
Loading…
Reference in New Issue
Block a user