Skip to content

feat: Allow passing state from tracked threads #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ build/
lib/
/*.tgz
test/yarn.lock
test/package.json
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ registerThread();
Watchdog thread:

```ts
const { captureStackTrace } = require("@sentry-internal/node-native-stacktrace");
const {
captureStackTrace,
} = require("@sentry-internal/node-native-stacktrace");

const stacks = captureStackTrace();
console.log(stacks);
Expand Down Expand Up @@ -87,15 +89,20 @@ In the main or worker threads if you call `registerThread()` regularly, times
are recorded.

```ts
const { registerThread } = require("@sentry-internal/node-native-stacktrace");
const {
registerThread,
threadPoll,
} = require("@sentry-internal/node-native-stacktrace");

registerThread();

setInterval(() => {
registerThread();
threadPoll({ optional_state: "some_value" });
}, 200);
```

In the watchdog thread you can call `getThreadsLastSeen()` to get how long it's
been in milliseconds since each thread registered.
been in milliseconds since each thread polled.

If any thread has exceeded a threshold, you can call `captureStackTrace()` to
get the stack traces for all threads.
Expand All @@ -111,11 +118,13 @@ const THRESHOLD = 1000; // 1 second
setInterval(() => {
for (const [thread, time] in Object.entries(getThreadsLastSeen())) {
if (time > THRESHOLD) {
const stacks = captureStackTrace();
const blockedThread = stacks[thread];
const threads = captureStackTrace();
const blockedThread = threads[thread];
const { frames, state } = blockedThread;
console.log(
`Thread '${thread}' blocked more than ${THRESHOLD}ms`,
blockedThread,
frames,
state,
);
}
}
Expand Down
116 changes: 96 additions & 20 deletions module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct ThreadInfo {
std::string thread_name;
// Last time this thread was seen in milliseconds since epoch
milliseconds last_seen;
// Some JSON serialized state for the thread
std::string state;
};

static std::mutex threads_mutex;
Expand All @@ -32,6 +34,12 @@ struct JsStackFrame {
// Type alias for a vector of JsStackFrame
using JsStackTrace = std::vector<JsStackFrame>;

struct ThreadResult {
std::string thread_name;
std::string state;
JsStackTrace stack_frames;
};

// Function to be called when an isolate's execution is interrupted
static void ExecutionInterrupted(Isolate *isolate, void *data) {
auto promise = static_cast<std::promise<JsStackTrace> *>(data);
Expand Down Expand Up @@ -91,7 +99,6 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
auto capture_from_isolate = args.GetIsolate();
auto current_context = capture_from_isolate->GetCurrentContext();

using ThreadResult = std::tuple<std::string, JsStackTrace>;
std::vector<std::future<ThreadResult>> futures;

{
Expand All @@ -100,35 +107,38 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
if (thread_isolate == capture_from_isolate)
continue;
auto thread_name = thread_info.thread_name;
auto state = thread_info.state;

futures.emplace_back(std::async(
std::launch::async,
[thread_name](Isolate *isolate) -> ThreadResult {
return std::make_tuple(thread_name, CaptureStackTrace(isolate));
[thread_name, state](Isolate *isolate) -> ThreadResult {
return ThreadResult{thread_name, state, CaptureStackTrace(isolate)};
},
thread_isolate));
}
}

Local<Object> result = Object::New(capture_from_isolate);
Local<Object> output = Object::New(capture_from_isolate);

for (auto &future : futures) {
auto [thread_name, frames] = future.get();
auto key = String::NewFromUtf8(capture_from_isolate, thread_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked();

Local<Array> jsFrames = Array::New(capture_from_isolate, frames.size());
for (size_t i = 0; i < frames.size(); ++i) {
const auto &f = frames[i];
auto result = future.get();
auto key =
String::NewFromUtf8(capture_from_isolate, result.thread_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked();

Local<Array> jsFrames =
Array::New(capture_from_isolate, result.stack_frames.size());
for (size_t i = 0; i < result.stack_frames.size(); ++i) {
const auto &frame = result.stack_frames[i];
Local<Object> frameObj = Object::New(capture_from_isolate);
frameObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "function",
NewStringType::kInternalized)
.ToLocalChecked(),
String::NewFromUtf8(capture_from_isolate,
f.function_name.c_str(),
frame.function_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked())
.Check();
Expand All @@ -137,7 +147,8 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
String::NewFromUtf8(capture_from_isolate, "filename",
NewStringType::kInternalized)
.ToLocalChecked(),
String::NewFromUtf8(capture_from_isolate, f.filename.c_str(),
String::NewFromUtf8(capture_from_isolate,
frame.filename.c_str(),
NewStringType::kNormal)
.ToLocalChecked())
.Check();
Expand All @@ -146,23 +157,52 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
String::NewFromUtf8(capture_from_isolate, "lineno",
NewStringType::kInternalized)
.ToLocalChecked(),
Integer::New(capture_from_isolate, f.lineno))
Integer::New(capture_from_isolate, frame.lineno))
.Check();
frameObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "colno",
NewStringType::kInternalized)
.ToLocalChecked(),
Integer::New(capture_from_isolate, f.colno))
Integer::New(capture_from_isolate, frame.colno))
.Check();
jsFrames->Set(current_context, static_cast<uint32_t>(i), frameObj)
.Check();
}

result->Set(current_context, key, jsFrames).Check();
// Create a thread object with a 'frames' property and optional 'state'
Local<Object> threadObj = Object::New(capture_from_isolate);
threadObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "frames",
NewStringType::kInternalized)
.ToLocalChecked(),
jsFrames)
.Check();

if (!result.state.empty()) {
v8::MaybeLocal<v8::String> stateStr = v8::String::NewFromUtf8(
capture_from_isolate, result.state.c_str(), NewStringType::kNormal);
if (!stateStr.IsEmpty()) {
v8::MaybeLocal<v8::Value> maybeStateVal =
v8::JSON::Parse(current_context, stateStr.ToLocalChecked());
v8::Local<v8::Value> stateVal;
if (maybeStateVal.ToLocal(&stateVal)) {
threadObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "state",
NewStringType::kInternalized)
.ToLocalChecked(),
stateVal)
.Check();
}
}
}

output->Set(current_context, key, threadObj).Check();
}

args.GetReturnValue().Set(result);
args.GetReturnValue().Set(output);
}

// Cleanup function to remove the thread from the map when the isolate is
Expand Down Expand Up @@ -194,13 +234,39 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {
std::lock_guard<std::mutex> lock(threads_mutex);
auto found = threads.find(isolate);
if (found == threads.end()) {
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero()});
threads.emplace(isolate,
ThreadInfo{thread_name, milliseconds::zero(), ""});
// Register a cleanup hook to remove this thread when the isolate is
// destroyed
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
}
}
}

// Function to track a thread and set its state
void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
auto isolate = args.GetIsolate();
auto context = isolate->GetCurrentContext();

std::string state_str;
if (args.Length() == 1 && args[0]->IsValue()) {
MaybeLocal<String> maybe_json = v8::JSON::Stringify(context, args[0]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if a return a huge json? Should we account for size limits in some way?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about this but I think the worse case is too much memory use or OOM.

if (!maybe_json.IsEmpty()) {
v8::String::Utf8Value utf8_state(isolate, maybe_json.ToLocalChecked());
state_str = *utf8_state ? *utf8_state : "";
} else {
state_str = "";
}
} else {
state_str = "";
}

{
std::lock_guard<std::mutex> lock(threads_mutex);
auto found = threads.find(isolate);
if (found != threads.end()) {
auto &thread_info = found->second;
thread_info.thread_name = thread_name;
thread_info.state = state_str;
thread_info.last_seen =
duration_cast<milliseconds>(system_clock::now().time_since_epoch());
}
Expand Down Expand Up @@ -257,6 +323,16 @@ NODE_MODULE_INITIALIZER(Local<Object> exports, Local<Value> module,
.ToLocalChecked())
.Check();

exports
->Set(context,
String::NewFromUtf8(isolate, "threadPoll",
NewStringType::kInternalized)
.ToLocalChecked(),
FunctionTemplate::New(isolate, ThreadPoll)
->GetFunction(context)
.ToLocalChecked())
.Check();

exports
->Set(context,
String::NewFromUtf8(isolate, "getThreadsLastSeen",
Expand Down
25 changes: 22 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const arch = process.env['BUILD_ARCH'] || _arch();
const abi = getAbi(versions.node, 'node');
const identifier = [platform, arch, stdlib, abi].filter(c => c !== undefined && c !== null).join('-');

type Thread<S = unknown> = {
frames: StackFrame[];
state?: S
}

type StackFrame = {
function: string;
filename: string;
Expand All @@ -20,7 +25,8 @@ type StackFrame = {

interface Native {
registerThread(threadName: string): void;
captureStackTrace(): Record<string, StackFrame[]>;
threadPoll(state?: object): void;
captureStackTrace<S = unknown>(): Record<string, Thread<S>>;
getThreadsLastSeen(): Record<string, number>;
}

Expand Down Expand Up @@ -177,11 +183,24 @@ export function registerThread(threadName: string = String(threadId)): void {
native.registerThread(threadName);
}

/**
* Tells the native module that the thread is still running and updates the state.
*
* @param state Optional state to pass to the native module.
*/
export function threadPoll(state?: object): void {
if (typeof state === 'object') {
native.threadPoll(state);
} else {
native.threadPoll();
}
}

/**
* Captures stack traces for all registered threads.
*/
export function captureStackTrace(): Record<string, StackFrame[]> {
return native.captureStackTrace();
export function captureStackTrace<S = unknown>(): Record<string, Thread<S>> {
return native.captureStackTrace<S>();
}

/**
Expand Down
10 changes: 6 additions & 4 deletions test/e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {

const stacks = JSON.parse(result.stdout.toString());

expect(stacks['0']).toEqual(expect.arrayContaining([
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand All @@ -34,7 +34,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
},
]));

expect(stacks['2']).toEqual(expect.arrayContaining([
expect(stacks['2'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand Down Expand Up @@ -64,7 +64,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {

const stacks = JSON.parse(result.stdout.toString());

expect(stacks['0']).toEqual(expect.arrayContaining([
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand All @@ -85,6 +85,8 @@ describe('e2e Tests', { timeout: 20000 }, () => {
},
]));

expect(stacks['2'].length).toEqual(1);
expect(stacks['0'].state).toEqual({ some_property: 'some_value' });

expect(stacks['2'].frames.length).toEqual(1);
});
});
7 changes: 0 additions & 7 deletions test/package.json

This file was deleted.

6 changes: 4 additions & 2 deletions test/stalled.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const { Worker } = require('node:worker_threads');
const { longWork } = require('./long-work.js');
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');

registerThread();

setInterval(() => {
registerThread();
threadPoll({ some_property: 'some_value' });
}, 200).unref();

const watchdog = new Worker('./test/stalled-watchdog.js');
Expand Down
8 changes: 4 additions & 4 deletions test/worker-do-nothing.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { longWork } = require('./long-work');
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');

registerThread();

setInterval(() => {
registerThread();
threadPoll();
}, 200);