mirror of
https://github.com/nodejs/node.git
synced 2025-05-06 21:35:34 +00:00
http_parser: consume StreamBase instance
Consume StreamBase instance and operate on incoming data directly without allocating Buffer instances. Improves performance. PR-URL: https://github.com/nodejs/node/pull/2355 Reviewed-By: Trevor Norris <trev.norris@gmail.com>
This commit is contained in:
parent
d6167689d9
commit
59b91f1447
@ -79,6 +79,8 @@ const STATUS_CODES = exports.STATUS_CODES = {
|
||||
511 : 'Network Authentication Required' // RFC 6585
|
||||
};
|
||||
|
||||
const kOnExecute = HTTPParser.kOnExecute | 0;
|
||||
|
||||
|
||||
function ServerResponse(req) {
|
||||
OutgoingMessage.call(this);
|
||||
@ -317,6 +319,18 @@ function connectionListener(socket) {
|
||||
socket.on('end', socketOnEnd);
|
||||
socket.on('data', socketOnData);
|
||||
|
||||
// We are consuming socket, so it won't get any actual data
|
||||
socket.on('resume', onSocketResume);
|
||||
socket.on('pause', onSocketPause);
|
||||
|
||||
socket.on('drain', socketOnDrain);
|
||||
|
||||
// Override on to unconsume on `data`, `readable` listeners
|
||||
socket.on = socketOnWrap;
|
||||
|
||||
parser.consume(socket._handle._externalStream);
|
||||
parser[kOnExecute] = onParserExecute;
|
||||
|
||||
// TODO(isaacs): Move all these functions out of here
|
||||
function socketOnError(e) {
|
||||
self.emit('clientError', e, this);
|
||||
@ -326,6 +340,19 @@ function connectionListener(socket) {
|
||||
assert(!socket._paused);
|
||||
debug('SERVER socketOnData %d', d.length);
|
||||
var ret = parser.execute(d);
|
||||
|
||||
onParserExecuteCommon(ret, d);
|
||||
}
|
||||
|
||||
function onParserExecute(ret, d) {
|
||||
debug('SERVER socketOnParserExecute %d', ret);
|
||||
onParserExecuteCommon(ret, undefined);
|
||||
|
||||
// Kick-off next ticks
|
||||
setImmediate(function() {});
|
||||
}
|
||||
|
||||
function onParserExecuteCommon(ret, d) {
|
||||
if (ret instanceof Error) {
|
||||
debug('parse error');
|
||||
socket.destroy(ret);
|
||||
@ -335,9 +362,13 @@ function connectionListener(socket) {
|
||||
var req = parser.incoming;
|
||||
debug('SERVER upgrade or connect', req.method);
|
||||
|
||||
if (!d)
|
||||
d = parser.getCurrentBuffer();
|
||||
|
||||
socket.removeListener('data', socketOnData);
|
||||
socket.removeListener('end', socketOnEnd);
|
||||
socket.removeListener('close', serverSocketCloseListener);
|
||||
parser.unconsume(socket._handle._externalStream);
|
||||
parser.finish();
|
||||
freeParser(parser, req, null);
|
||||
parser = null;
|
||||
@ -400,7 +431,6 @@ function connectionListener(socket) {
|
||||
socket.resume();
|
||||
}
|
||||
}
|
||||
socket.on('drain', socketOnDrain);
|
||||
|
||||
function parserOnIncoming(req, shouldKeepAlive) {
|
||||
incoming.push(req);
|
||||
@ -480,3 +510,24 @@ function connectionListener(socket) {
|
||||
}
|
||||
}
|
||||
exports._connectionListener = connectionListener;
|
||||
|
||||
function onSocketResume() {
|
||||
this._handle.readStart();
|
||||
}
|
||||
|
||||
function onSocketPause() {
|
||||
this._handle.readStop();
|
||||
}
|
||||
|
||||
function socketOnWrap(ev, fn) {
|
||||
var res = net.Socket.prototype.on.call(this, ev, fn);
|
||||
if (!this.parser) {
|
||||
this.on = net.Socket.prototype.on;
|
||||
return res;
|
||||
}
|
||||
|
||||
if (ev === 'data' || ev === 'readable')
|
||||
this.parser.unconsume(this._handle._externalStream);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -178,6 +178,7 @@ inline Environment::Environment(v8::Local<v8::Context> context,
|
||||
printed_error_(false),
|
||||
trace_sync_io_(false),
|
||||
debugger_agent_(this),
|
||||
http_parser_buffer_(nullptr),
|
||||
context_(context->GetIsolate(), context) {
|
||||
// We'll be creating new objects so make sure we've entered the context.
|
||||
v8::HandleScope handle_scope(isolate());
|
||||
@ -200,6 +201,7 @@ inline Environment::~Environment() {
|
||||
isolate_data()->Put();
|
||||
|
||||
delete[] heap_statistics_buffer_;
|
||||
delete[] http_parser_buffer_;
|
||||
}
|
||||
|
||||
inline void Environment::CleanupHandles() {
|
||||
@ -338,6 +340,15 @@ inline void Environment::set_heap_statistics_buffer(uint32_t* pointer) {
|
||||
heap_statistics_buffer_ = pointer;
|
||||
}
|
||||
|
||||
inline char* Environment::http_parser_buffer() const {
|
||||
return http_parser_buffer_;
|
||||
}
|
||||
|
||||
inline void Environment::set_http_parser_buffer(char* buffer) {
|
||||
CHECK_EQ(http_parser_buffer_, nullptr); // Should be set only once.
|
||||
http_parser_buffer_ = buffer;
|
||||
}
|
||||
|
||||
inline Environment* Environment::from_cares_timer_handle(uv_timer_t* handle) {
|
||||
return ContainerOf(&Environment::cares_timer_handle_, handle);
|
||||
}
|
||||
|
@ -427,6 +427,9 @@ class Environment {
|
||||
inline uint32_t* heap_statistics_buffer() const;
|
||||
inline void set_heap_statistics_buffer(uint32_t* pointer);
|
||||
|
||||
inline char* http_parser_buffer() const;
|
||||
inline void set_http_parser_buffer(char* buffer);
|
||||
|
||||
inline void ThrowError(const char* errmsg);
|
||||
inline void ThrowTypeError(const char* errmsg);
|
||||
inline void ThrowRangeError(const char* errmsg);
|
||||
@ -524,6 +527,8 @@ class Environment {
|
||||
|
||||
uint32_t* heap_statistics_buffer_ = nullptr;
|
||||
|
||||
char* http_parser_buffer_;
|
||||
|
||||
#define V(PropertyName, TypeName) \
|
||||
v8::Persistent<TypeName> PropertyName ## _;
|
||||
ENVIRONMENT_STRONG_PERSISTENT_PROPERTIES(V)
|
||||
|
@ -6,6 +6,8 @@
|
||||
#include "base-object-inl.h"
|
||||
#include "env.h"
|
||||
#include "env-inl.h"
|
||||
#include "stream_base.h"
|
||||
#include "stream_base-inl.h"
|
||||
#include "util.h"
|
||||
#include "util-inl.h"
|
||||
#include "v8.h"
|
||||
@ -36,6 +38,7 @@ namespace node {
|
||||
using v8::Array;
|
||||
using v8::Boolean;
|
||||
using v8::Context;
|
||||
using v8::EscapableHandleScope;
|
||||
using v8::Exception;
|
||||
using v8::Function;
|
||||
using v8::FunctionCallbackInfo;
|
||||
@ -54,6 +57,7 @@ const uint32_t kOnHeaders = 0;
|
||||
const uint32_t kOnHeadersComplete = 1;
|
||||
const uint32_t kOnBody = 2;
|
||||
const uint32_t kOnMessageComplete = 3;
|
||||
const uint32_t kOnExecute = 4;
|
||||
|
||||
|
||||
#define HTTP_CB(name) \
|
||||
@ -295,7 +299,7 @@ class Parser : public BaseObject {
|
||||
|
||||
|
||||
HTTP_DATA_CB(on_body) {
|
||||
HandleScope scope(env()->isolate());
|
||||
EscapableHandleScope scope(env()->isolate());
|
||||
|
||||
Local<Object> obj = object();
|
||||
Local<Value> cb = obj->Get(kOnBody);
|
||||
@ -303,6 +307,15 @@ class Parser : public BaseObject {
|
||||
if (!cb->IsFunction())
|
||||
return 0;
|
||||
|
||||
// We came from consumed stream
|
||||
if (current_buffer_.IsEmpty()) {
|
||||
// Make sure Buffer will be in parent HandleScope
|
||||
current_buffer_ = scope.Escape(Buffer::Copy(
|
||||
env()->isolate(),
|
||||
current_buffer_data_,
|
||||
current_buffer_len_).ToLocalChecked());
|
||||
}
|
||||
|
||||
Local<Value> argv[3] = {
|
||||
current_buffer_,
|
||||
Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_),
|
||||
@ -374,8 +387,6 @@ class Parser : public BaseObject {
|
||||
|
||||
// var bytesParsed = parser->execute(buffer);
|
||||
static void Execute(const FunctionCallbackInfo<Value>& args) {
|
||||
Environment* env = Environment::GetCurrent(args);
|
||||
|
||||
Parser* parser = Unwrap<Parser>(args.Holder());
|
||||
CHECK(parser->current_buffer_.IsEmpty());
|
||||
CHECK_EQ(parser->current_buffer_len_, 0);
|
||||
@ -390,40 +401,11 @@ class Parser : public BaseObject {
|
||||
// amount of overhead. Nothing else will run while http_parser_execute()
|
||||
// runs, therefore this pointer can be set and used for the execution.
|
||||
parser->current_buffer_ = buffer_obj;
|
||||
parser->current_buffer_len_ = buffer_len;
|
||||
parser->current_buffer_data_ = buffer_data;
|
||||
parser->got_exception_ = false;
|
||||
|
||||
size_t nparsed =
|
||||
http_parser_execute(&parser->parser_, &settings, buffer_data, buffer_len);
|
||||
Local<Value> ret = parser->Execute(buffer_data, buffer_len);
|
||||
|
||||
parser->Save();
|
||||
|
||||
// Unassign the 'buffer_' variable
|
||||
parser->current_buffer_.Clear();
|
||||
parser->current_buffer_len_ = 0;
|
||||
parser->current_buffer_data_ = nullptr;
|
||||
|
||||
// If there was an exception in one of the callbacks
|
||||
if (parser->got_exception_)
|
||||
return;
|
||||
|
||||
Local<Integer> nparsed_obj = Integer::New(env->isolate(), nparsed);
|
||||
// If there was a parse error in one of the callbacks
|
||||
// TODO(bnoordhuis) What if there is an error on EOF?
|
||||
if (!parser->parser_.upgrade && nparsed != buffer_len) {
|
||||
enum http_errno err = HTTP_PARSER_ERRNO(&parser->parser_);
|
||||
|
||||
Local<Value> e = Exception::Error(env->parse_error_string());
|
||||
Local<Object> obj = e->ToObject(env->isolate());
|
||||
obj->Set(env->bytes_parsed_string(), nparsed_obj);
|
||||
obj->Set(env->code_string(),
|
||||
OneByteString(env->isolate(), http_errno_name(err)));
|
||||
|
||||
args.GetReturnValue().Set(e);
|
||||
} else {
|
||||
args.GetReturnValue().Set(nparsed_obj);
|
||||
}
|
||||
if (!ret.IsEmpty())
|
||||
args.GetReturnValue().Set(ret);
|
||||
}
|
||||
|
||||
|
||||
@ -478,7 +460,148 @@ class Parser : public BaseObject {
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
static void Consume(const FunctionCallbackInfo<Value>& args) {
|
||||
Parser* parser = Unwrap<Parser>(args.Holder());
|
||||
Local<External> stream_obj = args[0].As<External>();
|
||||
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
|
||||
CHECK_NE(stream, nullptr);
|
||||
|
||||
stream->Consume();
|
||||
|
||||
parser->prev_alloc_cb_ = stream->alloc_cb();
|
||||
parser->prev_read_cb_ = stream->read_cb();
|
||||
|
||||
stream->set_alloc_cb({ OnAllocImpl, parser });
|
||||
stream->set_read_cb({ OnReadImpl, parser });
|
||||
}
|
||||
|
||||
|
||||
static void Unconsume(const FunctionCallbackInfo<Value>& args) {
|
||||
Parser* parser = Unwrap<Parser>(args.Holder());
|
||||
|
||||
// Already unconsumed
|
||||
if (parser->prev_alloc_cb_.is_empty())
|
||||
return;
|
||||
|
||||
CHECK(args[0]->IsExternal());
|
||||
Local<External> stream_obj = args[0].As<External>();
|
||||
StreamBase* stream = static_cast<StreamBase*>(stream_obj->Value());
|
||||
CHECK_NE(stream, nullptr);
|
||||
|
||||
stream->set_alloc_cb(parser->prev_alloc_cb_);
|
||||
stream->set_read_cb(parser->prev_read_cb_);
|
||||
}
|
||||
|
||||
|
||||
static void GetCurrentBuffer(const FunctionCallbackInfo<Value>& args) {
|
||||
Parser* parser = Unwrap<Parser>(args.Holder());
|
||||
|
||||
Local<Object> ret = Buffer::Copy(
|
||||
parser->env(),
|
||||
parser->current_buffer_data_,
|
||||
parser->current_buffer_len_).ToLocalChecked();
|
||||
|
||||
args.GetReturnValue().Set(ret);
|
||||
}
|
||||
|
||||
protected:
|
||||
static const size_t kAllocBufferSize = 64 * 1024;
|
||||
|
||||
static void OnAllocImpl(size_t suggested_size, uv_buf_t* buf, void* ctx) {
|
||||
Parser* parser = static_cast<Parser*>(ctx);
|
||||
Environment* env = parser->env();
|
||||
|
||||
if (env->http_parser_buffer() == nullptr)
|
||||
env->set_http_parser_buffer(new char[kAllocBufferSize]);
|
||||
|
||||
buf->base = env->http_parser_buffer();
|
||||
buf->len = kAllocBufferSize;
|
||||
}
|
||||
|
||||
|
||||
static void OnReadImpl(ssize_t nread,
|
||||
const uv_buf_t* buf,
|
||||
uv_handle_type pending,
|
||||
void* ctx) {
|
||||
Parser* parser = static_cast<Parser*>(ctx);
|
||||
HandleScope scope(parser->env()->isolate());
|
||||
|
||||
if (nread < 0) {
|
||||
uv_buf_t tmp_buf;
|
||||
tmp_buf.base = nullptr;
|
||||
tmp_buf.len = 0;
|
||||
parser->prev_read_cb_.fn(nread,
|
||||
&tmp_buf,
|
||||
pending,
|
||||
parser->prev_read_cb_.ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
// Ignore, empty reads have special meaning in http parser
|
||||
if (nread == 0)
|
||||
return;
|
||||
|
||||
parser->current_buffer_.Clear();
|
||||
Local<Value> ret = parser->Execute(buf->base, nread);
|
||||
|
||||
// Exception
|
||||
if (ret.IsEmpty())
|
||||
return;
|
||||
|
||||
Local<Object> obj = parser->object();
|
||||
Local<Value> cb = obj->Get(kOnExecute);
|
||||
|
||||
if (!cb->IsFunction())
|
||||
return;
|
||||
|
||||
// Hooks for GetCurrentBuffer
|
||||
parser->current_buffer_len_ = nread;
|
||||
parser->current_buffer_data_ = buf->base;
|
||||
|
||||
cb.As<Function>()->Call(obj, 1, &ret);
|
||||
|
||||
parser->current_buffer_len_ = 0;
|
||||
parser->current_buffer_data_ = nullptr;
|
||||
}
|
||||
|
||||
|
||||
Local<Value> Execute(char* data, size_t len) {
|
||||
EscapableHandleScope scope(env()->isolate());
|
||||
|
||||
current_buffer_len_ = len;
|
||||
current_buffer_data_ = data;
|
||||
got_exception_ = false;
|
||||
|
||||
size_t nparsed =
|
||||
http_parser_execute(&parser_, &settings, data, len);
|
||||
|
||||
Save();
|
||||
|
||||
// Unassign the 'buffer_' variable
|
||||
current_buffer_.Clear();
|
||||
current_buffer_len_ = 0;
|
||||
current_buffer_data_ = nullptr;
|
||||
|
||||
// If there was an exception in one of the callbacks
|
||||
if (got_exception_)
|
||||
return scope.Escape(Local<Value>());
|
||||
|
||||
Local<Integer> nparsed_obj = Integer::New(env()->isolate(), nparsed);
|
||||
// If there was a parse error in one of the callbacks
|
||||
// TODO(bnoordhuis) What if there is an error on EOF?
|
||||
if (!parser_.upgrade && nparsed != len) {
|
||||
enum http_errno err = HTTP_PARSER_ERRNO(&parser_);
|
||||
|
||||
Local<Value> e = Exception::Error(env()->parse_error_string());
|
||||
Local<Object> obj = e->ToObject(env()->isolate());
|
||||
obj->Set(env()->bytes_parsed_string(), nparsed_obj);
|
||||
obj->Set(env()->code_string(),
|
||||
OneByteString(env()->isolate(), http_errno_name(err)));
|
||||
|
||||
return scope.Escape(e);
|
||||
}
|
||||
return scope.Escape(nparsed_obj);
|
||||
}
|
||||
|
||||
Local<Array> CreateHeaders() {
|
||||
// num_values_ is either -1 or the entry # of the last header
|
||||
@ -542,6 +665,8 @@ class Parser : public BaseObject {
|
||||
Local<Object> current_buffer_;
|
||||
size_t current_buffer_len_;
|
||||
char* current_buffer_data_;
|
||||
StreamResource::Callback<StreamResource::AllocCb> prev_alloc_cb_;
|
||||
StreamResource::Callback<StreamResource::ReadCb> prev_read_cb_;
|
||||
static const struct http_parser_settings settings;
|
||||
};
|
||||
|
||||
@ -581,6 +706,8 @@ void InitHttpParser(Handle<Object> target,
|
||||
Integer::NewFromUnsigned(env->isolate(), kOnBody));
|
||||
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnMessageComplete"),
|
||||
Integer::NewFromUnsigned(env->isolate(), kOnMessageComplete));
|
||||
t->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "kOnExecute"),
|
||||
Integer::NewFromUnsigned(env->isolate(), kOnExecute));
|
||||
|
||||
Local<Array> methods = Array::New(env->isolate());
|
||||
#define V(num, name, string) \
|
||||
@ -595,6 +722,9 @@ void InitHttpParser(Handle<Object> target,
|
||||
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
|
||||
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
|
||||
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
|
||||
env->SetProtoMethod(t, "consume", Parser::Consume);
|
||||
env->SetProtoMethod(t, "unconsume", Parser::Unconsume);
|
||||
env->SetProtoMethod(t, "getCurrentBuffer", Parser::GetCurrentBuffer);
|
||||
|
||||
target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"),
|
||||
t->GetFunction());
|
||||
|
30
test/parallel/test-http-server-unconsume.js
Normal file
30
test/parallel/test-http-server-unconsume.js
Normal file
@ -0,0 +1,30 @@
|
||||
'use strict';
|
||||
var common = require('../common');
|
||||
var assert = require('assert');
|
||||
var http = require('http');
|
||||
var net = require('net');
|
||||
|
||||
var received = '';
|
||||
|
||||
var server = http.createServer(function(req, res) {
|
||||
res.writeHead(200);
|
||||
res.end();
|
||||
|
||||
req.socket.on('data', function(data) {
|
||||
received += data;
|
||||
});
|
||||
|
||||
server.close();
|
||||
}).listen(common.PORT, function() {
|
||||
var socket = net.connect(common.PORT, function() {
|
||||
socket.write('PUT / HTTP/1.1\r\n\r\n');
|
||||
|
||||
socket.once('data', function() {
|
||||
socket.end('hello world');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
process.on('exit', function() {
|
||||
assert.equal(received, 'hello world');
|
||||
});
|
Loading…
Reference in New Issue
Block a user