mirror of
https://github.com/nodejs/node.git
synced 2025-04-28 21:46:48 +00:00
dgram: use uv_udp_try_send()
This improves dgram performance by avoiding unnecessary async operations. One issue with this commit is that it seems hard to actually create conditions under which the fallback path to the async case is actually taken, for all supported OS, so an internal CLI option is used for testing that path. Another caveat is that the lack of an async operation means that there are slight timing differences (essentially `nextTick()` rather than `setImmediate()` for the send callback). PR-URL: https://github.com/nodejs/node/pull/29832 Reviewed-By: David Carlier <devnexen@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
parent
28c3a9dd72
commit
afdc3d0d18
@ -29,17 +29,25 @@ function main({ dur, len, num, type, chunks }) {
|
|||||||
|
|
||||||
function onsendConcat() {
|
function onsendConcat() {
|
||||||
if (sent++ % num === 0) {
|
if (sent++ % num === 0) {
|
||||||
for (var i = 0; i < num; i++) {
|
// The setImmediate() is necessary to have event loop progress on OSes
|
||||||
socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend);
|
// that only perform synchronous I/O on nonblocking UDP sockets.
|
||||||
}
|
setImmediate(() => {
|
||||||
|
for (var i = 0; i < num; i++) {
|
||||||
|
socket.send(Buffer.concat(chunk), PORT, '127.0.0.1', onsend);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function onsendMulti() {
|
function onsendMulti() {
|
||||||
if (sent++ % num === 0) {
|
if (sent++ % num === 0) {
|
||||||
for (var i = 0; i < num; i++) {
|
// The setImmediate() is necessary to have event loop progress on OSes
|
||||||
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
// that only perform synchronous I/O on nonblocking UDP sockets.
|
||||||
}
|
setImmediate(() => {
|
||||||
|
for (var i = 0; i < num; i++) {
|
||||||
|
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,9 +27,13 @@ function main({ dur, len, num, type, chunks }) {
|
|||||||
|
|
||||||
function onsend() {
|
function onsend() {
|
||||||
if (sent++ % num === 0) {
|
if (sent++ % num === 0) {
|
||||||
for (var i = 0; i < num; i++) {
|
// The setImmediate() is necessary to have event loop progress on OSes
|
||||||
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
// that only perform synchronous I/O on nonblocking UDP sockets.
|
||||||
}
|
setImmediate(() => {
|
||||||
|
for (var i = 0; i < num; i++) {
|
||||||
|
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,9 +23,13 @@ function main({ dur, len, num, type }) {
|
|||||||
|
|
||||||
function onsend() {
|
function onsend() {
|
||||||
if (sent++ % num === 0) {
|
if (sent++ % num === 0) {
|
||||||
for (var i = 0; i < num; i++) {
|
// The setImmediate() is necessary to have event loop progress on OSes
|
||||||
socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend);
|
// that only perform synchronous I/O on nonblocking UDP sockets.
|
||||||
}
|
setImmediate(() => {
|
||||||
|
for (var i = 0; i < num; i++) {
|
||||||
|
socket.send(chunk, 0, chunk.length, PORT, '127.0.0.1', onsend);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,9 +23,13 @@ function main({ dur, len, num, type }) {
|
|||||||
|
|
||||||
function onsend() {
|
function onsend() {
|
||||||
if (sent++ % num === 0) {
|
if (sent++ % num === 0) {
|
||||||
for (var i = 0; i < num; i++) {
|
// The setImmediate() is necessary to have event loop progress on OSes
|
||||||
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
// that only perform synchronous I/O on nonblocking UDP sockets.
|
||||||
}
|
setImmediate(() => {
|
||||||
|
for (var i = 0; i < num; i++) {
|
||||||
|
socket.send(chunk, PORT, '127.0.0.1', onsend);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -666,6 +666,14 @@ function doSend(ex, self, ip, list, address, port, callback) {
|
|||||||
else
|
else
|
||||||
err = state.handle.send(req, list, list.length, !!callback);
|
err = state.handle.send(req, list, list.length, !!callback);
|
||||||
|
|
||||||
|
if (err >= 1) {
|
||||||
|
// Synchronous finish. The return code is msg_length + 1 so that we can
|
||||||
|
// distinguish between synchronous success and asynchronous success.
|
||||||
|
if (callback)
|
||||||
|
process.nextTick(callback, null, err - 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (err && callback) {
|
if (err && callback) {
|
||||||
// Don't emit as error, dgram_legacy.js compatibility
|
// Don't emit as error, dgram_legacy.js compatibility
|
||||||
const ex = exceptionWithHostPort(err, 'send', address, port);
|
const ex = exceptionWithHostPort(err, 'send', address, port);
|
||||||
|
@ -453,6 +453,8 @@ EnvironmentOptionsParser::EnvironmentOptionsParser() {
|
|||||||
"write warnings to file instead of stderr",
|
"write warnings to file instead of stderr",
|
||||||
&EnvironmentOptions::redirect_warnings,
|
&EnvironmentOptions::redirect_warnings,
|
||||||
kAllowedInEnvironment);
|
kAllowedInEnvironment);
|
||||||
|
AddOption("--test-udp-no-try-send", "", // For testing only.
|
||||||
|
&EnvironmentOptions::test_udp_no_try_send);
|
||||||
AddOption("--throw-deprecation",
|
AddOption("--throw-deprecation",
|
||||||
"throw an exception on deprecations",
|
"throw an exception on deprecations",
|
||||||
&EnvironmentOptions::throw_deprecation,
|
&EnvironmentOptions::throw_deprecation,
|
||||||
|
@ -135,6 +135,7 @@ class EnvironmentOptions : public Options {
|
|||||||
bool heap_prof = false;
|
bool heap_prof = false;
|
||||||
#endif // HAVE_INSPECTOR
|
#endif // HAVE_INSPECTOR
|
||||||
std::string redirect_warnings;
|
std::string redirect_warnings;
|
||||||
|
bool test_udp_no_try_send = false;
|
||||||
bool throw_deprecation = false;
|
bool throw_deprecation = false;
|
||||||
bool trace_deprecation = false;
|
bool trace_deprecation = false;
|
||||||
bool trace_sync_io = false;
|
bool trace_sync_io = false;
|
||||||
|
@ -429,11 +429,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
|
|||||||
size_t count = args[2].As<Uint32>()->Value();
|
size_t count = args[2].As<Uint32>()->Value();
|
||||||
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
|
const bool have_callback = sendto ? args[5]->IsTrue() : args[3]->IsTrue();
|
||||||
|
|
||||||
SendWrap* req_wrap;
|
|
||||||
{
|
|
||||||
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
|
|
||||||
req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
|
|
||||||
}
|
|
||||||
size_t msg_size = 0;
|
size_t msg_size = 0;
|
||||||
|
|
||||||
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
|
MaybeStackBuffer<uv_buf_t, 16> bufs(count);
|
||||||
@ -448,8 +443,6 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
|
|||||||
msg_size += length;
|
msg_size += length;
|
||||||
}
|
}
|
||||||
|
|
||||||
req_wrap->msg_size = msg_size;
|
|
||||||
|
|
||||||
int err = 0;
|
int err = 0;
|
||||||
struct sockaddr_storage addr_storage;
|
struct sockaddr_storage addr_storage;
|
||||||
sockaddr* addr = nullptr;
|
sockaddr* addr = nullptr;
|
||||||
@ -462,18 +455,47 @@ void UDPWrap::DoSend(const FunctionCallbackInfo<Value>& args, int family) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uv_buf_t* bufs_ptr = *bufs;
|
||||||
|
if (err == 0 && !UNLIKELY(env->options()->test_udp_no_try_send)) {
|
||||||
|
err = uv_udp_try_send(&wrap->handle_, bufs_ptr, count, addr);
|
||||||
|
if (err == UV_ENOSYS || err == UV_EAGAIN) {
|
||||||
|
err = 0;
|
||||||
|
} else if (err >= 0) {
|
||||||
|
size_t sent = err;
|
||||||
|
while (count > 0 && bufs_ptr->len <= sent) {
|
||||||
|
sent -= bufs_ptr->len;
|
||||||
|
bufs_ptr++;
|
||||||
|
count--;
|
||||||
|
}
|
||||||
|
if (count > 0) {
|
||||||
|
CHECK_LT(sent, bufs_ptr->len);
|
||||||
|
bufs_ptr->base += sent;
|
||||||
|
bufs_ptr->len -= sent;
|
||||||
|
} else {
|
||||||
|
CHECK_EQ(static_cast<size_t>(err), msg_size);
|
||||||
|
// + 1 so that the JS side can distinguish 0-length async sends from
|
||||||
|
// 0-length sync sends.
|
||||||
|
args.GetReturnValue().Set(static_cast<uint32_t>(msg_size) + 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
|
AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(wrap);
|
||||||
|
SendWrap* req_wrap = new SendWrap(env, req_wrap_obj, have_callback);
|
||||||
|
req_wrap->msg_size = msg_size;
|
||||||
|
|
||||||
err = req_wrap->Dispatch(uv_udp_send,
|
err = req_wrap->Dispatch(uv_udp_send,
|
||||||
&wrap->handle_,
|
&wrap->handle_,
|
||||||
*bufs,
|
bufs_ptr,
|
||||||
count,
|
count,
|
||||||
addr,
|
addr,
|
||||||
OnSend);
|
OnSend);
|
||||||
|
if (err)
|
||||||
|
delete req_wrap;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (err)
|
|
||||||
delete req_wrap;
|
|
||||||
|
|
||||||
args.GetReturnValue().Set(err);
|
args.GetReturnValue().Set(err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
// Flags: --test-udp-no-try-send
|
||||||
'use strict';
|
'use strict';
|
||||||
|
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
|
@ -22,7 +22,7 @@ function onsend() {
|
|||||||
client.on('listening', function() {
|
client.on('listening', function() {
|
||||||
port = this.address().port;
|
port = this.address().port;
|
||||||
|
|
||||||
setImmediate(function() {
|
process.nextTick(() => {
|
||||||
async = true;
|
async = true;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
// Flags: --expose-gc --expose-internals --no-warnings
|
// Flags: --expose-gc --expose-internals --no-warnings --test-udp-no-try-send
|
||||||
|
|
||||||
const common = require('../common');
|
const common = require('../common');
|
||||||
const { internalBinding } = require('internal/test/binding');
|
const { internalBinding } = require('internal/test/binding');
|
||||||
|
Loading…
Reference in New Issue
Block a user