Skip to content

Commit

Permalink
src: simplify v8 thread pool implementation
Browse files Browse the repository at this point in the history
This commit drops the semaphore in exchange for a second condition
variable and makes the task ring an array member instead of allocating
it on the heap.  That in turn makes size calculations a little easier
because of the array's fixed size.

PR-URL: node-forward/node#34
Reviewed-By: Fedor Indutny <fedor@indutny.com>
  • Loading branch information
bnoordhuis committed Oct 28, 2014
1 parent 78e38f5 commit 3543c55
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 42 deletions.
75 changes: 42 additions & 33 deletions src/node_v8_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,64 +95,73 @@ void Platform::WorkerBody(void* arg) {
}


TaskQueue::TaskQueue() {
int err;

for (size_t i = 0; i < ARRAY_SIZE(ring_); i += 1)
ring_[i] = nullptr;

read_off_ = 0;
write_off_ = 0;

err = uv_sem_init(&sem_, 0);
CHECK_EQ(err, 0);

err = uv_cond_init(&cond_);
CHECK_EQ(err, 0);

err = uv_mutex_init(&mutex_);
CHECK_EQ(err, 0);
TaskQueue::TaskQueue() : read_off_(0), write_off_(0) {
CHECK_EQ(0, uv_cond_init(&read_cond_));
CHECK_EQ(0, uv_cond_init(&write_cond_));
CHECK_EQ(0, uv_mutex_init(&mutex_));
}


TaskQueue::~TaskQueue() {
uv_mutex_lock(&mutex_);
CHECK_EQ(read_off_, write_off_);
uv_sem_destroy(&sem_);
uv_cond_destroy(&cond_);
uv_mutex_unlock(&mutex_);
uv_cond_destroy(&read_cond_);
uv_cond_destroy(&write_cond_);
uv_mutex_destroy(&mutex_);
}


void TaskQueue::Push(Task* task) {
uv_mutex_lock(&mutex_);

// Wait for empty cell
while (ring_[write_off_] != nullptr)
uv_cond_wait(&cond_, &mutex_);
while (can_write() == false)
uv_cond_wait(&write_cond_, &mutex_); // Wait until there is a free slot.

ring_[write_off_] = task;
write_off_++;
write_off_ &= kRingMask;
write_off_ = next(write_off_);
uv_cond_signal(&read_cond_);
uv_mutex_unlock(&mutex_);

uv_sem_post(&sem_);
}


Task* TaskQueue::Shift() {
uv_sem_wait(&sem_);

uv_mutex_lock(&mutex_);
Task* task = ring_[read_off_];
ring_[read_off_] = nullptr;
uv_cond_signal(&cond_);

read_off_++;
read_off_ &= kRingMask;
while (can_read() == false)
uv_cond_wait(&read_cond_, &mutex_);

Task* task = ring_[read_off_];
if (can_write() == false)
uv_cond_signal(&write_cond_); // Signal waiters that we freed up a slot.
read_off_ = next(read_off_);
uv_mutex_unlock(&mutex_);

return task;
}


unsigned int TaskQueue::next(unsigned int n) {
return (n + 1) % ARRAY_SIZE(TaskQueue::ring_);
}


bool TaskQueue::can_read() const {
return read_off_ != write_off_;
}


// The read pointer chases the write pointer in the circular queue.
// This method checks that the write pointer hasn't advanced so much
// that it has gone full circle and caught up with the read pointer.
//
// can_write() returns false when there is an empty slot but the read pointer
// points to the first element and the write pointer to the last element.
// That should be rare enough that it is not worth the extra bookkeeping
// to work around that. It's not harmful either, just mildly inefficient.
bool TaskQueue::can_write() const {
return next(write_off_) != read_off_;
}


} // namespace node
15 changes: 6 additions & 9 deletions src/node_v8_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@ class TaskQueue {
v8::Task* Shift();

private:
static const unsigned int kRingSize = 1024;
static const unsigned int kRingMask = kRingSize - 1;

static_assert(kRingSize == (kRingSize & ~kRingMask),
"kRingSize is not a power of two");

uv_sem_t sem_;
uv_cond_t cond_;
static unsigned int next(unsigned int n);
bool can_read() const;
bool can_write() const;
uv_cond_t read_cond_;
uv_cond_t write_cond_;
uv_mutex_t mutex_;
unsigned int read_off_;
unsigned int write_off_;
v8::Task* ring_[kRingSize];
v8::Task* ring_[1024];
};

class Platform : public v8::Platform {
Expand Down

0 comments on commit 3543c55

Please sign in to comment.