src: do not reuse async resource in http parsers

Change resource being used, previously HTTParser was being reused.
We are now using IncomingMessage and ClientRequest objects.  The goal
here is to make the async resource unique for each async operatio

Refs: https://github.com/nodejs/node/pull/24330
Refs: https://github.com/nodejs/diagnostics/issues/248
Refs: https://github.com/nodejs/node/pull/21313

Co-authored-by: Matheus Marchini <mat@mmarchini.me>

PR-URL: https://github.com/nodejs/node/pull/25094
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benedikt Meurer <benedikt.meurer@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
This commit is contained in:
Daniel Beckert 2018-12-13 15:35:48 -02:00 committed by Matheus Marchini
parent 5aaf666b3b
commit ece507394a
No known key found for this signature in database
GPG Key ID: BE516BA4874DB4D9
13 changed files with 102 additions and 32 deletions

View File

@ -236,10 +236,10 @@ The `type` is a string identifying the type of resource that caused
resource's constructor.
```text
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP, TTYWRAP,
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPINCOMINGMESSAGE,
HTTPCLIENTREQUEST, JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP,
SHUTDOWNWRAP, SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP,
TTYWRAP, UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
RANDOMBYTESREQUEST, TLSWRAP, Microtask, Timeout, Immediate, TickObject
```

View File

@ -632,11 +632,10 @@ function emitFreeNT(socket) {
}
function tickOnSocket(req, socket) {
const isParserReused = parsers.hasItems();
const parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE, isParserReused);
parser.initialize(HTTPParser.RESPONSE, req);
parser.socket = socket;
parser.outgoing = req;
req.parser = parser;

View File

@ -126,6 +126,12 @@ const STATUS_CODES = {
const kOnExecute = HTTPParser.kOnExecute | 0;
class HTTPServerAsyncResource {
constructor(type, socket) {
this.type = type;
this.socket = socket;
}
}
function ServerResponse(req) {
OutgoingMessage.call(this);
@ -349,9 +355,15 @@ function connectionListenerInternal(server, socket) {
socket.setTimeout(server.timeout);
socket.on('timeout', socketOnTimeout);
const isParserReused = parsers.hasItems();
const parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST, isParserReused);
// TODO(addaleax): This doesn't play well with the
// `async_hooks.currentResource()` proposal, see
// https://github.com/nodejs/node/pull/21313
parser.initialize(
HTTPParser.REQUEST,
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket)
);
parser.socket = socket;
// We are starting to wait for our headers.

View File

@ -34,6 +34,11 @@ inline AsyncWrap::ProviderType AsyncWrap::provider_type() const {
return provider_type_;
}
inline AsyncWrap::ProviderType AsyncWrap::set_provider_type(
AsyncWrap::ProviderType provider) {
provider_type_ = provider;
return provider_type_;
}
inline double AsyncWrap::get_async_id() const {
return async_id_;

View File

@ -602,11 +602,15 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
env->destroy_async_id_list()->push_back(async_id);
}
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
AsyncReset(object(), execution_async_id, silent);
}
// Generalized call for both the constructor and for handles that are pooled
// and reused over their lifetime. This way a new uid can be assigned when
// the resource is pulled out of the pool and put back into use.
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
bool silent) {
if (async_id_ != -1) {
// This instance was in use before, we have already emitted an init with
// its previous async_id and need to emit a matching destroy for that
@ -643,7 +647,7 @@ void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
if (silent) return;
EmitAsyncInit(env(), object(),
EmitAsyncInit(env(), resource,
env()->async_hooks()->provider_string(provider_type()),
async_id_, trigger_async_id_);
}

View File

@ -46,7 +46,8 @@ namespace node {
V(HTTP2STREAM) \
V(HTTP2PING) \
V(HTTP2SETTINGS) \
V(HTTPPARSER) \
V(HTTPINCOMINGMESSAGE) \
V(HTTPCLIENTREQUEST) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
@ -147,11 +148,16 @@ class AsyncWrap : public BaseObject {
static void DestroyAsyncIdsCallback(Environment* env, void* data);
inline ProviderType provider_type() const;
inline ProviderType set_provider_type(ProviderType provider);
inline double get_async_id() const;
inline double get_trigger_async_id() const;
void AsyncReset(v8::Local<v8::Object> resource,
double execution_async_id = -1,
bool silent = false);
void AsyncReset(double execution_async_id = -1, bool silent = false);
// Only call these within a valid HandleScope.
@ -202,7 +208,7 @@ class AsyncWrap : public BaseObject {
ProviderType provider,
double execution_async_id,
bool silent);
const ProviderType provider_type_;
ProviderType provider_type_;
// Because the values may be Reset(), cannot be made const.
double async_id_ = -1;
double trigger_async_id_;

View File

@ -152,11 +152,13 @@ struct StringPtr {
size_t size_;
};
class Parser : public AsyncWrap, public StreamListener {
public:
Parser(Environment* env, Local<Object> wrap, parser_type_t type)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER),
: AsyncWrap(env, wrap,
type == HTTP_REQUEST ?
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE :
AsyncWrap::PROVIDER_HTTPCLIENTREQUEST),
current_buffer_len_(0),
current_buffer_data_(nullptr) {
Init(type);
@ -503,12 +505,12 @@ class Parser : public AsyncWrap, public StreamListener {
}
static void Reinitialize(const FunctionCallbackInfo<Value>& args) {
static void Initialize(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
CHECK(args[0]->IsInt32());
CHECK(args[1]->IsBoolean());
bool isReused = args[1]->IsTrue();
CHECK(args[1]->IsObject());
parser_type_t type =
static_cast<parser_type_t>(args[0].As<Int32>()->Value());
@ -517,16 +519,16 @@ class Parser : public AsyncWrap, public StreamListener {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Should always be called from the same context.
CHECK_EQ(env, parser->env());
// This parser has either just been created or it is being reused.
// We must only call AsyncReset for the latter case, because AsyncReset has
// already been called via the constructor for the former case.
if (isReused) {
parser->AsyncReset();
}
AsyncWrap::ProviderType provider =
(type == HTTP_REQUEST ?
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE
: AsyncWrap::PROVIDER_HTTPCLIENTREQUEST);
parser->set_provider_type(provider);
parser->Init(type);
}
template <bool should_pause>
static void Pause(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
@ -958,7 +960,7 @@ void InitializeHttpParser(Local<Object> target,
env->SetProtoMethod(t, "free", Parser::Free);
env->SetProtoMethod(t, "execute", Parser::Execute);
env->SetProtoMethod(t, "finish", Parser::Finish);
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
env->SetProtoMethod(t, "initialize", Parser::Initialize);
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
env->SetProtoMethod(t, "consume", Parser::Consume);

View File

@ -0,0 +1,39 @@
'use strict';
const common = require('../common');
const http = require('http');
const assert = require('assert');
const { createHook } = require('async_hooks');
const reused = Symbol('reused');
let reusedHTTPParser = false;
const asyncHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
if (resource[reused]) {
reusedHTTPParser = true;
}
resource[reused] = true;
}
});
asyncHook.enable();
const server = http.createServer(function(req, res) {
res.end();
});
const PORT = 3000;
const url = 'http://127.0.0.1:' + PORT;
server.listen(PORT, common.mustCall(() => {
http.get(url, common.mustCall(() => {
server.close(common.mustCall(() => {
server.listen(PORT, common.mustCall(() => {
http.get(url, common.mustCall(() => {
server.close(common.mustCall(() => {
assert.strictEqual(reusedHTTPParser, false);
}));
}));
}));
}));
}));
}));

View File

@ -21,7 +21,7 @@ const request = Buffer.from(
);
const parser = new HTTPParser(REQUEST);
const as = hooks.activitiesOfTypes('HTTPPARSER');
const as = hooks.activitiesOfTypes('HTTPINCOMINGMESSAGE');
const httpparser = as[0];
assert.strictEqual(as.length, 1);
@ -47,7 +47,7 @@ process.on('exit', onexit);
function onexit() {
hooks.disable();
hooks.sanityCheck('HTTPPARSER');
hooks.sanityCheck('HTTPINCOMINGMESSAGE');
checkInvocations(httpparser, { init: 1, before: 1, after: 1, destroy: 1 },
'when process exits');
}

View File

@ -26,7 +26,7 @@ const request = Buffer.from(
);
const parser = new HTTPParser(RESPONSE);
const as = hooks.activitiesOfTypes('HTTPPARSER');
const as = hooks.activitiesOfTypes('HTTPCLIENTREQUEST');
const httpparser = as[0];
assert.strictEqual(as.length, 1);
@ -58,7 +58,7 @@ process.on('exit', onexit);
function onexit() {
hooks.disable();
hooks.sanityCheck('HTTPPARSER');
hooks.sanityCheck('HTTPCLIENTREQUEST');
checkInvocations(httpparser, { init: 1, before: 2, after: 2, destroy: 1 },
'when process exits');
}

View File

@ -95,7 +95,7 @@ function expectBody(expected) {
throw new Error('hello world');
};
parser.reinitialize(HTTPParser.REQUEST, false);
parser.initialize(HTTPParser.REQUEST, request);
assert.throws(
() => { parser.execute(request, 0, request.length); },
@ -555,7 +555,7 @@ function expectBody(expected) {
parser[kOnBody] = expectBody('ping');
parser.execute(req1, 0, req1.length);
parser.reinitialize(REQUEST, false);
parser.initialize(REQUEST, req2);
parser[kOnBody] = expectBody('pong');
parser[kOnHeadersComplete] = onHeadersComplete2;
parser.execute(req2, 0, req2.length);

View File

@ -48,6 +48,8 @@ const { getSystemErrorName } = require('util');
if (!common.isMainThread)
delete providers.INSPECTORJSBINDING;
delete providers.KEYPAIRGENREQUEST;
delete providers.HTTPCLIENTREQUEST;
delete providers.HTTPINCOMINGMESSAGE;
const objKeys = Object.keys(providers);
if (objKeys.length > 0)

View File

@ -7,6 +7,7 @@ const common = require('../common');
const assert = require('assert');
const httpCommon = require('_http_common');
const { HTTPParser } = require('_http_common');
const { AsyncResource } = require('async_hooks');
const net = require('net');
const COUNT = httpCommon.parsers.max + 1;
@ -24,7 +25,7 @@ function execAndClose() {
process.stdout.write('.');
const parser = parsers.pop();
parser.reinitialize(HTTPParser.RESPONSE, !!parser.reused);
parser.initialize(HTTPParser.RESPONSE, new AsyncResource('ClientRequest'));
const socket = net.connect(common.PORT);
socket.on('error', (e) => {