Skip to content

worker: add worker.getHeapStatistics() #57888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 17, 2025
Merged
13 changes: 13 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1337,6 +1337,18 @@ If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` is rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.

### `worker.getHeapStatistics()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

This method returns a `Promise` that will resolve to an object identical to [`v8.getHeapStatistics()`][],
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
This methods allows the statistics to be observed from outside the actual thread.

### `worker.performance`

<!-- YAML
Expand Down Expand Up @@ -1631,6 +1643,7 @@ thread spawned will spawn another until the application crashes.
[`require('node:worker_threads').workerData`]: #workerworkerdata
[`trace_events`]: tracing.md
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
[`v8.getHeapStatistics()`]: v8.md#v8getheapstatistics
[`vm`]: vm.md
[`worker.SHARE_ENV`]: #workershare_env
[`worker.on('message')`]: #event-message_1
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,17 @@ class Worker extends EventEmitter {
};
});
}

getHeapStatistics() {
const taker = this[kHandle]?.getHeapStatistics();

return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (handle) => {
resolve(handle);
};
});
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ namespace node {
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \
V(ZLIB)

Expand Down
1 change: 1 addition & 0 deletions src/env_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,7 @@
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)

#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \
Expand Down
126 changes: 126 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,116 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
}
}

class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSTATISTICS) {}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerHeapStatisticsTaker)
SET_SELF_SIZE(WorkerHeapStatisticsTaker)
};

void Worker::GetHeapStatistics(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_heap_statistics_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}

// The created WorkerHeapStatisticsTaker is an object owned by main
// thread's Isolate, it can not be accessed by worker thread
std::unique_ptr<BaseObjectPtr<WorkerHeapStatisticsTaker>> taker =
std::make_unique<BaseObjectPtr<WorkerHeapStatisticsTaker>>(
MakeDetachedBaseObject<WorkerHeapStatisticsTaker>(env, wrap));

// Interrupt the worker thread and take a snapshot, then schedule a call
// on the parent thread that turns that snapshot into a readable stream.
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
// We create a unique pointer to HeapStatistics so that the actual object
// it's not copied in the lambda, but only the pointer is.
auto heap_stats = std::make_unique<v8::HeapStatistics>();
worker_env->isolate()->GetHeapStatistics(heap_stats.get());

// Here, the worker thread temporarily owns the WorkerHeapStatisticsTaker
// object.

env->SetImmediateThreadsafe(
[taker = std::move(taker),
heap_stats = std::move(heap_stats)](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());

AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());

Local<v8::Name> heap_stats_names[] = {
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size_executable"),
FIXED_ONE_BYTE_STRING(isolate, "total_physical_size"),
FIXED_ONE_BYTE_STRING(isolate, "total_available_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_heap_size"),
FIXED_ONE_BYTE_STRING(isolate, "heap_size_limit"),
FIXED_ONE_BYTE_STRING(isolate, "malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "peak_malloced_memory"),
FIXED_ONE_BYTE_STRING(isolate, "does_zap_garbage"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_native_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "number_of_detached_contexts"),
FIXED_ONE_BYTE_STRING(isolate, "total_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "used_global_handles_size"),
FIXED_ONE_BYTE_STRING(isolate, "external_memory")};

// Define an array of property values
Local<Value> heap_stats_values[] = {
Number::New(isolate, heap_stats->total_heap_size()),
Number::New(isolate, heap_stats->total_heap_size_executable()),
Number::New(isolate, heap_stats->total_physical_size()),
Number::New(isolate, heap_stats->total_available_size()),
Number::New(isolate, heap_stats->used_heap_size()),
Number::New(isolate, heap_stats->heap_size_limit()),
Number::New(isolate, heap_stats->malloced_memory()),
Number::New(isolate, heap_stats->peak_malloced_memory()),
Boolean::New(isolate, heap_stats->does_zap_garbage()),
Number::New(isolate, heap_stats->number_of_native_contexts()),
Number::New(isolate, heap_stats->number_of_detached_contexts()),
Number::New(isolate, heap_stats->total_global_handles_size()),
Number::New(isolate, heap_stats->used_global_handles_size()),
Number::New(isolate, heap_stats->external_memory())};

DCHECK_EQ(arraysize(heap_stats_names), arraysize(heap_stats_values));

// Create the object with the property names and values
Local<Object> stats = Object::New(isolate,
Null(isolate),
heap_stats_names,
heap_stats_values,
arraysize(heap_stats_names));

Local<Value> args[] = {stats};
taker->get()->MakeCallback(
env->ondone_string(), arraysize(args), args);
// implicitly delete `taker`
},
CallbackFlags::kUnrefed);

// Now, the lambda is delivered to the main thread, as a result, the
// WorkerHeapStatisticsTaker object is delivered to the main thread, too.
});

if (scheduled) {
args.GetReturnValue().Set(wrap);
} else {
args.GetReturnValue().Set(Local<Object>());
}
}

void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Expand Down Expand Up @@ -996,6 +1106,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);

SetConstructorFunction(isolate, target, "Worker", w);
}
Expand All @@ -1014,6 +1125,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate());
}

{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);

wst->InstanceTemplate()->SetInternalFieldCount(
WorkerHeapSnapshotTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));

Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapStatisticsTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_heap_statistics_taker_template(
wst->InstanceTemplate());
}

SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
}

Expand Down Expand Up @@ -1079,6 +1204,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::TakeHeapSnapshot);
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::GetHeapStatistics);
}

} // anonymous namespace
Expand Down
2 changes: 2 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class Worker : public AsyncWrap {
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetHeapStatistics(
const v8::FunctionCallbackInfo<v8::Value>& args);

private:
bool CreateEnvMessagePort(Environment* env);
Expand Down
63 changes: 63 additions & 0 deletions test/parallel/test-worker-heap-statistics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict';

const common = require('../common');
const fixtures = require('../common/fixtures');

common.skipIfInspectorDisabled();

const {
Worker,
isMainThread,
} = require('worker_threads');

if (!isMainThread) {
common.skip('This test only works on a main thread');
}

// Ensures that worker.getHeapStatistics() returns valid data

const assert = require('assert');

if (isMainThread) {
const name = 'Hello Thread';
const worker = new Worker(fixtures.path('worker-name.js'), {
name,
});
worker.once('message', common.mustCall(async (message) => {
const stats = await worker.getHeapStatistics();
const keys = [
`total_heap_size`,
`total_heap_size_executable`,
`total_physical_size`,
`total_available_size`,
`used_heap_size`,
`heap_size_limit`,
`malloced_memory`,
`peak_malloced_memory`,
`does_zap_garbage`,
`number_of_native_contexts`,
`number_of_detached_contexts`,
`total_global_handles_size`,
`used_global_handles_size`,
`external_memory`,
].sort();
assert.deepStrictEqual(keys, Object.keys(stats).sort());
for (const key of keys) {
if (key === 'does_zap_garbage') {
assert.strictEqual(typeof stats[key], 'boolean', `Expected ${key} to be a boolean`);
continue;
}
assert.strictEqual(typeof stats[key], 'number', `Expected ${key} to be a number`);
assert.ok(stats[key] >= 0, `Expected ${key} to be >= 0`);
}

worker.postMessage('done');
}));

worker.once('exit', common.mustCall(async (code) => {
assert.strictEqual(code, 0);
await assert.rejects(worker.getHeapStatistics(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));
}
1 change: 1 addition & 0 deletions test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const { getSystemErrorName } = require('util');
delete providers.ELDHISTOGRAM;
delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT;
delete providers.WORKERHEAPSTATISTICS;
delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST;
Expand Down
1 change: 1 addition & 0 deletions typings/internalBinding/worker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ declare namespace InternalWorkerBinding {
unref(): void;
getResourceLimits(): Float64Array;
takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
loopIdleTime(): number;
loopStartTime(): number;
}
Expand Down
Loading