Skip to content

Commit 3725695

Browse files
committed
worker: add cpuUsage for worker
1 parent 049664b commit 3725695

File tree

9 files changed

+247
-1
lines changed

9 files changed

+247
-1
lines changed

doc/api/worker_threads.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1613,6 +1613,19 @@ added: v10.5.0
16131613
The `'online'` event is emitted when the worker thread has started executing
16141614
JavaScript code.
16151615

1616+
### `worker.cpuUsage()`
1617+
1618+
<!-- YAML
1619+
added:
1620+
- REPLACEME
1621+
-->
1622+
1623+
* Returns: {Promise}
1624+
1625+
This method returns a `Promise` that will resolve to an object identical to [`process.threadCpuUsage()`][],
1626+
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
1627+
This methods allows the statistics to be observed from outside the actual thread.
1628+
16161629
### `worker.getHeapSnapshot([options])`
16171630

16181631
<!-- YAML
@@ -1975,6 +1988,7 @@ thread spawned will spawn another until the application crashes.
19751988
[`process.stderr`]: process.md#processstderr
19761989
[`process.stdin`]: process.md#processstdin
19771990
[`process.stdout`]: process.md#processstdout
1991+
[`process.threadCpuUsage()`]: process.md#processthreadcpuusage
19781992
[`process.title`]: process.md#processtitle
19791993
[`require('node:worker_threads').isMainThread`]: #workerismainthread
19801994
[`require('node:worker_threads').parentPort.on('message')`]: #event-message

lib/internal/worker.js

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const {
88
Float64Array,
99
FunctionPrototypeBind,
1010
MathMax,
11+
NumberMAX_SAFE_INTEGER,
1112
ObjectEntries,
1213
Promise,
1314
PromiseResolve,
@@ -41,6 +42,7 @@ const {
4142
ERR_WORKER_INVALID_EXEC_ARGV,
4243
ERR_INVALID_ARG_TYPE,
4344
ERR_INVALID_ARG_VALUE,
45+
ERR_OPERATION_FAILED,
4446
} = errorCodes;
4547

4648
const workerIo = require('internal/worker/io');
@@ -61,7 +63,7 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
6163
const { deserializeError } = require('internal/error_serdes');
6264
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6365
const { kEmptyObject } = require('internal/util');
64-
const { validateArray, validateString } = require('internal/validators');
66+
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
6567
const {
6668
throwIfBuildingSnapshot,
6769
} = require('internal/v8/startup_snapshot');
@@ -466,6 +468,37 @@ class Worker extends EventEmitter {
466468
};
467469
});
468470
}
471+
472+
cpuUsage(prev) {
473+
if (prev) {
474+
validateObject(prev, 'prev');
475+
validateNumber(prev.user, 'prev.user', 0, NumberMAX_SAFE_INTEGER);
476+
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
477+
}
478+
if (process.platform === 'sunos') {
479+
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
480+
}
481+
const taker = this[kHandle]?.cpuUsage();
482+
return new Promise((resolve, reject) => {
483+
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
484+
taker.ondone = (err, current) => {
485+
if (err !== null) {
486+
return reject(err);
487+
}
488+
if (prev) {
489+
resolve({
490+
user: current.user - prev.user,
491+
system: current.system - prev.system,
492+
});
493+
} else {
494+
resolve({
495+
user: current.user,
496+
system: current.system,
497+
});
498+
}
499+
};
500+
});
501+
}
469502
}
470503

471504
/**

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ namespace node {
7878
V(UDPWRAP) \
7979
V(SIGINTWATCHDOG) \
8080
V(WORKER) \
81+
V(WORKERCPUUSAGE) \
8182
V(WORKERHEAPSNAPSHOT) \
8283
V(WORKERHEAPSTATISTICS) \
8384
V(WRITEWRAP) \

src/env_properties.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@
470470
V(tcp_constructor_template, v8::FunctionTemplate) \
471471
V(tty_constructor_template, v8::FunctionTemplate) \
472472
V(write_wrap_template, v8::ObjectTemplate) \
473+
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
473474
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
474475
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
475476
V(x509_constructor_template, v8::FunctionTemplate)

src/node_worker.cc

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,97 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
810810
}
811811
}
812812

813+
814+
class WorkerCpuUsageTaker : public AsyncWrap {
815+
public:
816+
WorkerCpuUsageTaker(Environment* env, Local<Object> obj)
817+
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERCPUUSAGE) {}
818+
819+
SET_NO_MEMORY_INFO()
820+
SET_MEMORY_INFO_NAME(WorkerCpuUsageTaker)
821+
SET_SELF_SIZE(WorkerCpuUsageTaker)
822+
};
823+
824+
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
825+
Worker* w;
826+
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
827+
828+
Environment* env = w->env();
829+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
830+
Local<Object> wrap;
831+
if (!env->worker_cpu_usage_taker_template()
832+
->NewInstance(env->context())
833+
.ToLocal(&wrap)) {
834+
return;
835+
}
836+
837+
std::unique_ptr<BaseObjectPtr<WorkerCpuUsageTaker>> taker =
838+
std::make_unique<BaseObjectPtr<WorkerCpuUsageTaker>>(
839+
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap));
840+
841+
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
842+
env](Environment* worker_env) mutable {
843+
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
844+
int err = uv_getrusage_thread(cpu_usage_stats.get());
845+
846+
env->SetImmediateThreadsafe(
847+
[
848+
taker = std::move(taker),
849+
cpu_usage_stats = std::move(cpu_usage_stats),
850+
err = err
851+
](Environment* env) mutable {
852+
Isolate* isolate = env->isolate();
853+
HandleScope handle_scope(isolate);
854+
Context::Scope context_scope(env->context());
855+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
856+
857+
Local<Value> argv[] = {
858+
Null(isolate),
859+
Undefined(isolate),
860+
};
861+
862+
if (err) {
863+
argv[0] = UVException(isolate,
864+
err,
865+
"uv_getrusage_thread",
866+
nullptr,
867+
nullptr,
868+
nullptr);
869+
} else {
870+
Local<v8::Name> names[] = {
871+
FIXED_ONE_BYTE_STRING(isolate, "user"),
872+
FIXED_ONE_BYTE_STRING(isolate, "system"),
873+
};
874+
Local<Value> values[] = {
875+
Number::New(isolate,
876+
1e6 *
877+
cpu_usage_stats->ru_utime.tv_sec +
878+
cpu_usage_stats->ru_utime.tv_usec),
879+
Number::New(isolate,
880+
1e6 *
881+
cpu_usage_stats->ru_stime.tv_sec +
882+
cpu_usage_stats->ru_stime.tv_usec),
883+
};
884+
argv[1] = Object::New(isolate,
885+
Null(isolate),
886+
names,
887+
values,
888+
arraysize(names));
889+
}
890+
891+
taker->get()->MakeCallback(
892+
env->ondone_string(), arraysize(argv), argv);
893+
},
894+
CallbackFlags::kUnrefed);
895+
});
896+
897+
if (scheduled) {
898+
args.GetReturnValue().Set(wrap);
899+
} else {
900+
args.GetReturnValue().Set(Local<Object>());
901+
}
902+
}
903+
813904
class WorkerHeapStatisticsTaker : public AsyncWrap {
814905
public:
815906
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
@@ -1101,6 +1192,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11011192
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
11021193
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
11031194
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
1195+
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);
11041196

11051197
SetConstructorFunction(isolate, target, "Worker", w);
11061198
}
@@ -1133,6 +1225,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
11331225
wst->InstanceTemplate());
11341226
}
11351227

1228+
{
1229+
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
1230+
1231+
wst->InstanceTemplate()->SetInternalFieldCount(
1232+
WorkerCpuUsageTaker::kInternalFieldCount);
1233+
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
1234+
1235+
Local<String> wst_string =
1236+
FIXED_ONE_BYTE_STRING(isolate, "WorkerCpuUsageTaker");
1237+
wst->SetClassName(wst_string);
1238+
isolate_data->set_worker_cpu_usage_taker_template(
1239+
wst->InstanceTemplate());
1240+
}
1241+
11361242
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
11371243
}
11381244

@@ -1199,6 +1305,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
11991305
registry->Register(Worker::LoopIdleTime);
12001306
registry->Register(Worker::LoopStartTime);
12011307
registry->Register(Worker::GetHeapStatistics);
1308+
registry->Register(Worker::CpuUsage);
12021309
}
12031310

12041311
} // anonymous namespace

src/node_worker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ class Worker : public AsyncWrap {
8080
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
8181
static void GetHeapStatistics(
8282
const v8::FunctionCallbackInfo<v8::Value>& args);
83+
static void CpuUsage(
84+
const v8::FunctionCallbackInfo<v8::Value>& args);
8385

8486
private:
8587
bool CreateEnvMessagePort(Environment* env);
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
const common = require('../common');
3+
const { isSunOS } = require('../common');
4+
const assert = require('assert');
5+
const {
6+
Worker,
7+
} = require('worker_threads');
8+
9+
function validate(result) {
10+
assert.ok(typeof result == 'object' && result !== null);
11+
assert.ok(result.user >= 0);
12+
assert.ok(result.system >= 0);
13+
assert.ok(Number.isFinite(result.user));
14+
assert.ok(Number.isFinite(result.system));
15+
}
16+
17+
function check(worker) {
18+
[
19+
-1,
20+
1.1,
21+
NaN,
22+
undefined,
23+
{},
24+
[],
25+
null,
26+
function() {},
27+
Symbol(),
28+
true,
29+
Infinity,
30+
{ user: -1, system: 1 },
31+
{ user: 1, system: -1 },
32+
].forEach((value) => {
33+
try {
34+
worker.cpuUsage(value);
35+
} catch (e) {
36+
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
37+
}
38+
});
39+
}
40+
41+
const worker = new Worker(`
42+
const { parentPort } = require('worker_threads');
43+
parentPort.on('message', (msg) => {
44+
if (msg === 'exit') {
45+
process.exit(0);
46+
}
47+
});
48+
`, { eval: true });
49+
50+
// See test-process-threadCpuUsage-main-thread.js
51+
if (isSunOS) {
52+
assert.throws(
53+
() => worker.cpuUsage(),
54+
{
55+
code: 'ERR_OPERATION_FAILED',
56+
name: 'Error',
57+
message: 'Operation failed: worker.cpuUsage() is not available on SunOS'
58+
}
59+
);
60+
worker.postMessage('exit');
61+
} else {
62+
worker.on('online', common.mustCall(async () => {
63+
check(worker);
64+
65+
const prev = await worker.cpuUsage();
66+
validate(prev);
67+
68+
const curr = await worker.cpuUsage();
69+
validate(curr);
70+
71+
assert.ok(curr.user >= prev.user);
72+
assert.ok(curr.system >= prev.system);
73+
74+
const delta = await worker.cpuUsage(curr);
75+
validate(delta);
76+
77+
worker.postMessage('exit');
78+
}));
79+
80+
worker.once('exit', common.mustCall(async (code) => {
81+
assert.strictEqual(code, 0);
82+
await assert.rejects(worker.cpuUsage(), {
83+
code: 'ERR_WORKER_NOT_RUNNING'
84+
});
85+
}));
86+
}

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ const { getSystemErrorName } = require('util');
6262
delete providers.SIGINTWATCHDOG;
6363
delete providers.WORKERHEAPSNAPSHOT;
6464
delete providers.WORKERHEAPSTATISTICS;
65+
delete providers.WORKERCPUUSAGE;
6566
delete providers.BLOBREADER;
6667
delete providers.RANDOMPRIMEREQUEST;
6768
delete providers.CHECKPRIMEREQUEST;

typings/internalBinding/worker.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ declare namespace InternalWorkerBinding {
1616
getResourceLimits(): Float64Array;
1717
takeHeapSnapshot(): object;
1818
getHeapStatistics(): Promise<object>;
19+
cpuUsage(): Promise<object>;
1920
loopIdleTime(): number;
2021
loopStartTime(): number;
2122
}

0 commit comments

Comments
 (0)