worker: add worker.getHeapStatistics()

Adds worker.getHeapStatistics() so that the heap usage of the worker
could be observer from the parent thread.

Signed-off-by: Matteo Collina <hello@matteocollina.com>
PR-URL: https://github.com/nodejs/node/pull/57888
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: Darshan Sen <raisinten@gmail.com>
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
This commit is contained in:
Matteo Collina 2025-04-17 12:30:59 +02:00 committed by GitHub
parent c11c7be36e
commit 33d8e03d9d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 219 additions and 0 deletions

View File

@ -1337,6 +1337,18 @@ If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` is rejected [`'exit'` event][] is emitted, the returned `Promise` is rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error. immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.
### `worker.getHeapStatistics()`
<!-- YAML
added: REPLACEME
-->
* Returns: {Promise}
This method returns a `Promise` that will resolve to an object identical to [`v8.getHeapStatistics()`][],
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
This methods allows the statistics to be observed from outside the actual thread.
### `worker.performance` ### `worker.performance`
<!-- YAML <!-- YAML
@ -1631,6 +1643,7 @@ thread spawned will spawn another until the application crashes.
[`require('node:worker_threads').workerData`]: #workerworkerdata [`require('node:worker_threads').workerData`]: #workerworkerdata
[`trace_events`]: tracing.md [`trace_events`]: tracing.md
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions [`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
[`v8.getHeapStatistics()`]: v8.md#v8getheapstatistics
[`vm`]: vm.md [`vm`]: vm.md
[`worker.SHARE_ENV`]: #workershare_env [`worker.SHARE_ENV`]: #workershare_env
[`worker.on('message')`]: #event-message_1 [`worker.on('message')`]: #event-message_1

View File

@ -459,6 +459,17 @@ class Worker extends EventEmitter {
}; };
}); });
} }
getHeapStatistics() {
const taker = this[kHandle]?.getHeapStatistics();
return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (handle) => {
resolve(handle);
};
});
}
} }
/** /**

View File

@ -79,6 +79,7 @@ namespace node {
V(SIGINTWATCHDOG) \ V(SIGINTWATCHDOG) \
V(WORKER) \ V(WORKER) \
V(WORKERHEAPSNAPSHOT) \ V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \ V(WRITEWRAP) \
V(ZLIB) V(ZLIB)

View File

@ -464,6 +464,7 @@
V(tty_constructor_template, v8::FunctionTemplate) \ V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \ V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \ V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate) V(x509_constructor_template, v8::FunctionTemplate)
#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \ #define PER_REALM_STRONG_PERSISTENT_VALUES(V) \

View File

@ -816,6 +816,116 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
} }
} }
class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSTATISTICS) {}
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerHeapStatisticsTaker)
SET_SELF_SIZE(WorkerHeapStatisticsTaker)
};
void Worker::GetHeapStatistics(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_heap_statistics_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}
// The created WorkerHeapStatisticsTaker is an object owned by main
// thread's Isolate, it can not be accessed by worker thread
std::unique_ptr<BaseObjectPtr<WorkerHeapStatisticsTaker>> taker =
std::make_unique<BaseObjectPtr<WorkerHeapStatisticsTaker>>(
MakeDetachedBaseObject<WorkerHeapStatisticsTaker>(env, wrap));
// Interrupt the worker thread and take a snapshot, then schedule a call
// on the parent thread that turns that snapshot into a readable stream.
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
// We create a unique pointer to HeapStatistics so that the actual object
// it's not copied in the lambda, but only the pointer is.
auto heap_stats = std::make_unique<v8::HeapStatistics>();
worker_env->isolate()->GetHeapStatistics(heap_stats.get());
// Here, the worker thread temporarily owns the WorkerHeapStatisticsTaker
// object.
env->SetImmediateThreadsafe(
[taker = std::move(taker),
heap_stats = std::move(heap_stats)](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
Local<v8::Name> heap_stats_names[] = {
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size_executable"),
FIXED_ONE_BYTE_STRING(isolate, "total_physical_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_available_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "heap_size_limit"),
FIXED_ONE_BYTE_STRING(isolate, "malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "peak_malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "does_zap_garbage"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_native_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_detached_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "total_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "external_memory")};
// Define an array of property values
Local<Value> heap_stats_values[] = {
Number::New(isolate, heap_stats->total_heap_size()),
Number::New(isolate, heap_stats->total_heap_size_executable()),
Number::New(isolate, heap_stats->total_physical_size()),
Number::New(isolate, heap_stats->total_available_size()),
Number::New(isolate, heap_stats->used_heap_size()),
Number::New(isolate, heap_stats->heap_size_limit()),
Number::New(isolate, heap_stats->malloced_memory()),
Number::New(isolate, heap_stats->peak_malloced_memory()),
Boolean::New(isolate, heap_stats->does_zap_garbage()),
Number::New(isolate, heap_stats->number_of_native_contexts()),
Number::New(isolate, heap_stats->number_of_detached_contexts()),
Number::New(isolate, heap_stats->total_global_handles_size()),
Number::New(isolate, heap_stats->used_global_handles_size()),
Number::New(isolate, heap_stats->external_memory())};
DCHECK_EQ(arraysize(heap_stats_names), arraysize(heap_stats_values));
// Create the object with the property names and values
Local<Object> stats = Object::New(isolate,
Null(isolate),
heap_stats_names,
heap_stats_values,
arraysize(heap_stats_names));
Local<Value> args[] = {stats};
taker->get()->MakeCallback(
env->ondone_string(), arraysize(args), args);
// implicitly delete `taker`
},
CallbackFlags::kUnrefed);
// Now, the lambda is delivered to the main thread, as a result, the
// WorkerHeapStatisticsTaker object is delivered to the main thread, too.
});
if (scheduled) {
args.GetReturnValue().Set(wrap);
} else {
args.GetReturnValue().Set(Local<Object>());
}
}
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) { void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
Worker* w; Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
@ -996,6 +1106,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot); SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime); SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime); SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
SetConstructorFunction(isolate, target, "Worker", w); SetConstructorFunction(isolate, target, "Worker", w);
} }
@ -1014,6 +1125,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate()); wst->InstanceTemplate());
} }
{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
wst->InstanceTemplate()->SetInternalFieldCount(
WorkerHeapSnapshotTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapStatisticsTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_heap_statistics_taker_template(
wst->InstanceTemplate());
}
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort); SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
} }
@ -1079,6 +1204,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime); registry->Register(Worker::LoopStartTime);
registry->Register(Worker::GetHeapStatistics);
} }
} // anonymous namespace } // anonymous namespace

View File

@ -78,6 +78,8 @@ class Worker : public AsyncWrap {
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args); static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args); static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args); static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetHeapStatistics(
const v8::FunctionCallbackInfo<v8::Value>& args);
private: private:
bool CreateEnvMessagePort(Environment* env); bool CreateEnvMessagePort(Environment* env);

View File

@ -0,0 +1,63 @@
'use strict';
const common = require('../common');
const fixtures = require('../common/fixtures');
common.skipIfInspectorDisabled();
const {
Worker,
isMainThread,
} = require('worker_threads');
if (!isMainThread) {
common.skip('This test only works on a main thread');
}
// Ensures that worker.getHeapStatistics() returns valid data
const assert = require('assert');
if (isMainThread) {
const name = 'Hello Thread';
const worker = new Worker(fixtures.path('worker-name.js'), {
name,
});
worker.once('message', common.mustCall(async (message) => {
const stats = await worker.getHeapStatistics();
const keys = [
`total_heap_size`,
`total_heap_size_executable`,
`total_physical_size`,
`total_available_size`,
`used_heap_size`,
`heap_size_limit`,
`malloced_memory`,
`peak_malloced_memory`,
`does_zap_garbage`,
`number_of_native_contexts`,
`number_of_detached_contexts`,
`total_global_handles_size`,
`used_global_handles_size`,
`external_memory`,
].sort();
assert.deepStrictEqual(keys, Object.keys(stats).sort());
for (const key of keys) {
if (key === 'does_zap_garbage') {
assert.strictEqual(typeof stats[key], 'boolean', `Expected ${key} to be a boolean`);
continue;
}
assert.strictEqual(typeof stats[key], 'number', `Expected ${key} to be a number`);
assert.ok(stats[key] >= 0, `Expected ${key} to be >= 0`);
}
worker.postMessage('done');
}));
worker.once('exit', common.mustCall(async (code) => {
assert.strictEqual(code, 0);
await assert.rejects(worker.getHeapStatistics(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));
}

View File

@ -61,6 +61,7 @@ const { getSystemErrorName } = require('util');
delete providers.ELDHISTOGRAM; delete providers.ELDHISTOGRAM;
delete providers.SIGINTWATCHDOG; delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT; delete providers.WORKERHEAPSNAPSHOT;
delete providers.WORKERHEAPSTATISTICS;
delete providers.BLOBREADER; delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST; delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST; delete providers.CHECKPRIMEREQUEST;

View File

@ -15,6 +15,7 @@ declare namespace InternalWorkerBinding {
unref(): void; unref(): void;
getResourceLimits(): Float64Array; getResourceLimits(): Float64Array;
takeHeapSnapshot(): object; takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
loopIdleTime(): number; loopIdleTime(): number;
loopStartTime(): number; loopStartTime(): number;
} }