Skip to content

Commit

Permalink
Join all threads at end of main thread
Browse files Browse the repository at this point in the history
Require reorganizing the isolates somewhat.

Add a very simple test.
  • Loading branch information
ry authored and bnoordhuis committed Dec 29, 2011
1 parent 4d02e77 commit 2684765
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 82 deletions.
88 changes: 33 additions & 55 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1849,54 +1849,11 @@ static Handle<Value> Binding(const Arguments& args) {
}


struct ThreadInfo {
uv_thread_t thread_;
char** argv_;
int argc_;

ThreadInfo(int argc, char** argv) {
argc_ = argc;
argv_ = new char*[argc_ + 1];

for (int i = 0; i < argc_; ++i) {
size_t size = 1 + strlen(argv[i]);
argv_[i] = new char[size];
memcpy(argv_[i], argv[i], size);
}
argv_[argc_] = NULL;
}

ThreadInfo(Handle<Array> args) {
argc_ = args->Length();
argv_ = new char*[argc_ + 1];

for (int i = 0; i < argc_; ++i) {
String::Utf8Value str(args->Get(i));
size_t size = 1 + strlen(*str);
argv_[i] = new char[size];
memcpy(argv_[i], *str, size);
}
argv_[argc_] = NULL;
}

~ThreadInfo() {
for (int i = 0; i < argc_; ++i) {
delete[] argv_[i];
}
delete argv_;
}
};


static void RunIsolate(void* arg) {
ThreadInfo* ti = reinterpret_cast<ThreadInfo*>(arg);

Isolate* isolate = Isolate::New();

StartThread(isolate, ti->argc_, ti->argv_);
node::Isolate* isolate = reinterpret_cast<node::Isolate*>(arg);
isolate->Enter();
StartThread(isolate, isolate->argc_, isolate->argv_);
isolate->Dispose();

delete ti;
delete isolate;
}

Expand All @@ -1912,10 +1869,23 @@ static Handle<Value> NewIsolate(const Arguments& args) {
Local<Array> argv = args[0].As<Array>();
assert(argv->Length() >= 2);

ThreadInfo* ti = new ThreadInfo(argv);
// Note that isolate lock is aquired in the constructor here. It will not
// be unlocked until RunIsolate starts and calls isolate->Enter().
Isolate* isolate = new node::Isolate();

if (uv_thread_create(&ti->thread_, RunIsolate, ti)) {
delete ti;
// Copy over arguments into isolate
isolate->argc_ = argv->Length();
isolate->argv_ = new char*[isolate->argc_ + 1];
for (int i = 0; i < isolate->argc_; ++i) {
String::Utf8Value str(argv->Get(i));
size_t size = 1 + strlen(*str);
isolate->argv_[i] = new char[size];
memcpy(isolate->argv_[i], *str, size);
}
isolate->argv_[isolate->argc_] = NULL;

if (uv_thread_create(&isolate->tid_, RunIsolate, isolate)) {
delete isolate;
return Null();
}

Expand All @@ -1924,7 +1894,7 @@ static Handle<Value> NewIsolate(const Arguments& args) {

Local<Object> obj = tpl->NewInstance();
obj->SetPointerInInternalField(0, magic_isolate_cookie_);
obj->SetPointerInInternalField(1, ti);
obj->SetPointerInInternalField(1, isolate);

return scope.Close(obj);
}
Expand All @@ -1945,10 +1915,10 @@ static Handle<Value> JoinIsolate(const Arguments& args) {
assert(obj->InternalFieldCount() == 2);
assert(obj->GetPointerFromInternalField(0) == magic_isolate_cookie_);

ThreadInfo* ti = reinterpret_cast<ThreadInfo*>(
Isolate* ti = reinterpret_cast<Isolate*>(
obj->GetPointerFromInternalField(1));

if (uv_thread_join(&ti->thread_))
if (uv_thread_join(&ti->tid_))
return False(); // error
else
return True(); // ok
Expand Down Expand Up @@ -2700,8 +2670,7 @@ void StartThread(node::Isolate* isolate,
char** argv) {
HandleScope scope;

v8::Isolate::Scope isolate_scope(isolate->GetV8Isolate());
v8::Context::Scope context_scope(isolate->GetV8Context());
assert(node::Isolate::GetCurrent() == isolate);

uv_loop_t* loop = isolate->GetLoop();
uv_prepare_init(loop, &prepare_tick_watcher);
Expand Down Expand Up @@ -2787,12 +2756,21 @@ int Start(int argc, char *argv[]) {
v8::V8::Initialize();
v8::HandleScope handle_scope;

// Get the id of the this, the main, thread.
uv_thread_t tid = uv_thread_self();

// Create the main node::Isolate object
node::Isolate::Initialize();
Isolate* isolate = node::Isolate::New();
Isolate* isolate = new node::Isolate();
isolate->tid_ = tid;
isolate->Enter();
StartThread(isolate, argc, argv);
isolate->Dispose();

// The main thread/isolate is done. Wait for all other thread/isolates to
// finish.
node::Isolate::JoinAll();

#ifndef NDEBUG
// Clean up.
V8::Dispose();
Expand Down
65 changes: 43 additions & 22 deletions src/node_isolate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,31 @@ void Isolate::Initialize() {
}


Isolate* Isolate::New() {
return new Isolate();
int Isolate::Count() {
return isolate_count;
}


int Isolate::Count() {
return isolate_count;
void Isolate::JoinAll() {
uv_mutex_lock(&list_lock);

while (ngx_queue_empty(&list_head) == false) {
ngx_queue_t* q = ngx_queue_head(&list_head);
assert(q);
Isolate* isolate = ngx_queue_data(q, Isolate, list_member_);
assert(isolate);

// Unlock the list while we join the thread.
uv_mutex_unlock(&list_lock);

uv_thread_join(&isolate->tid_);

// Relock to check the next element in the list.
uv_mutex_lock(&list_lock);
}

// Unlock the list finally.
uv_mutex_unlock(&list_lock);
}


Expand All @@ -79,19 +97,11 @@ Isolate::Isolate() {

uv_mutex_unlock(&list_lock);

v8_isolate_ = v8::Isolate::GetCurrent();
if (v8_isolate_ == NULL) {
v8_isolate_ = v8::Isolate::New();
v8_isolate_->Enter();
}

v8_isolate_ = v8::Isolate::New();
assert(v8_isolate_->GetData() == NULL);
v8_isolate_->SetData(this);

v8_context_ = v8::Context::New();
v8_context_->Enter();

globals_init(&globals_);
globals_init_ = false;
}


Expand All @@ -112,20 +122,31 @@ void Isolate::AtExit(AtExitCallback callback, void* arg) {
}


void Isolate::Enter() {
v8_isolate_->Enter();

if (v8_context_.IsEmpty()) {
v8_context_ = v8::Context::New();
}
v8_context_->Enter();

if (!globals_init_) {
globals_init_ = true;
globals_init(&globals_);
}

NODE_ISOLATE_CHECK(this);
}


void Isolate::Dispose() {
uv_mutex_lock(&list_lock);

NODE_ISOLATE_CHECK(this);

struct AtExitCallbackInfo* it;
ngx_queue_t* q;

NODE_ISOLATE_CHECK(this);

ngx_queue_foreach(q, &at_exit_callbacks_) {
it = ngx_queue_data(q, struct AtExitCallbackInfo, at_exit_callbacks_);
it->callback_(it->arg_);
delete it;
}
ngx_queue_init(&at_exit_callbacks_);

assert(v8_context_->InContext());
v8_context_->Exit();
Expand Down
25 changes: 20 additions & 5 deletions src/node_isolate.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ namespace node {

class Isolate {
public:
char** argv_;
int argc_;
uv_thread_t tid_;

// Call this before instantiating any Isolate
static void Initialize();
static int Count();

typedef void (*AtExitCallback)(void* arg);

static Isolate* New();
static void JoinAll();

static Isolate* GetCurrent() {
return reinterpret_cast<Isolate*>(v8::Isolate::GetCurrent()->GetData());
Expand All @@ -75,16 +79,26 @@ class Isolate {
*/
void AtExit(AtExitCallback callback, void *arg);

/* Shutdown the isolate. Call this method at thread death. */
void Dispose();

struct globals* Globals();

unsigned int id_;

private:
// This constructor is used for every non-main thread
Isolate();

~Isolate() {
if (argv_) {
delete argv_;
}
}

void Enter();

/* Shutdown the isolate. Call this method at thread death. */
void Dispose();

private:

struct AtExitCallbackInfo {
ngx_queue_t at_exit_callbacks_;
AtExitCallback callback_;
Expand All @@ -101,6 +115,7 @@ class Isolate {

// Global variables for this isolate.
struct globals globals_;
bool globals_init_;
};

} // namespace node
Expand Down
11 changes: 11 additions & 0 deletions test/simple/test-isolates.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
console.log("count: %d", process._countIsolate());

if (process.tid === 1) {
var isolate = process._newIsolate(process.argv);
//process._joinIsolate(isolate);
console.error("master");
console.log("count: %d", process._countIsolate());
} else {
console.error("FUCK YEAH!");
console.log("count: %d", process._countIsolate());
}

0 comments on commit 2684765

Please sign in to comment.