Skip to content

Commit

Permalink
Revert 111865 - Remove several more custom Task implementations from …
Browse files Browse the repository at this point in the history
…base/

BUG=none
TEST=trybots

Review URL: http://codereview.chromium.org/8702016

TBR=dcheng@chromium.org
Review URL: http://codereview.chromium.org/8729020

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@111868 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
dcheng@chromium.org committed Nov 29, 2011
1 parent 3a117c6 commit 7ee9fa6
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 90 deletions.
131 changes: 81 additions & 50 deletions base/threading/worker_pool_posix_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

#include <set>

#include "base/bind.h"
#include "base/callback.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task.h"
#include "base/threading/platform_thread.h"
#include "base/synchronization/waitable_event.h"
#include "testing/gtest/include/gtest/gtest.h"
Expand Down Expand Up @@ -51,41 +50,75 @@ namespace {
// threads used if more than one IncrementingTask is consecutively posted to the
// thread pool, since the first one might finish executing before the subsequent
// PostTask() calls get invoked.
void IncrementingTask(Lock* counter_lock,
int* counter,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads) {
{
base::AutoLock locked(*unique_threads_lock);
unique_threads->insert(PlatformThread::CurrentId());
class IncrementingTask : public Task {
public:
IncrementingTask(Lock* counter_lock,
int* counter,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads)
: counter_lock_(counter_lock),
unique_threads_lock_(unique_threads_lock),
unique_threads_(unique_threads),
counter_(counter) {}

virtual void Run() {
AddSelfToUniqueThreadSet();
base::AutoLock locked(*counter_lock_);
(*counter_)++;
}
base::AutoLock locked(*counter_lock);
(*counter)++;
}

// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
struct BlockingIncrementingTaskArgs {
Lock* counter_lock;
int* counter;
Lock* unique_threads_lock;
std::set<PlatformThreadId>* unique_threads;
Lock* num_waiting_to_start_lock;
int* num_waiting_to_start;
ConditionVariable* num_waiting_to_start_cv;
base::WaitableEvent* start;
void AddSelfToUniqueThreadSet() {
base::AutoLock locked(*unique_threads_lock_);
unique_threads_->insert(PlatformThread::CurrentId());
}

private:
Lock* counter_lock_;
Lock* unique_threads_lock_;
std::set<PlatformThreadId>* unique_threads_;
int* counter_;

DISALLOW_COPY_AND_ASSIGN(IncrementingTask);
};

void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) {
{
base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock);
(*args.num_waiting_to_start)++;
// BlockingIncrementingTask is a simple wrapper around IncrementingTask that
// allows for waiting at the start of Run() for a WaitableEvent to be signalled.
class BlockingIncrementingTask : public Task {
public:
BlockingIncrementingTask(Lock* counter_lock,
int* counter,
Lock* unique_threads_lock,
std::set<PlatformThreadId>* unique_threads,
Lock* num_waiting_to_start_lock,
int* num_waiting_to_start,
ConditionVariable* num_waiting_to_start_cv,
base::WaitableEvent* start)
: incrementer_(
counter_lock, counter, unique_threads_lock, unique_threads),
num_waiting_to_start_lock_(num_waiting_to_start_lock),
num_waiting_to_start_(num_waiting_to_start),
num_waiting_to_start_cv_(num_waiting_to_start_cv),
start_(start) {}

virtual void Run() {
{
base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_);
(*num_waiting_to_start_)++;
}
num_waiting_to_start_cv_->Signal();
start_->Wait();
incrementer_.Run();
}
args.num_waiting_to_start_cv->Signal();
args.start->Wait();
IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock,
args.unique_threads);
}

private:
IncrementingTask incrementer_;
Lock* num_waiting_to_start_lock_;
int* num_waiting_to_start_;
ConditionVariable* num_waiting_to_start_cv_;
base::WaitableEvent* start_;

DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask);
};

class PosixDynamicThreadPoolTest : public testing::Test {
protected:
Expand Down Expand Up @@ -120,18 +153,16 @@ class PosixDynamicThreadPoolTest : public testing::Test {
}
}

base::Closure CreateNewIncrementingTaskCallback() {
return base::Bind(&IncrementingTask, &counter_lock_, &counter_,
&unique_threads_lock_, &unique_threads_);
Task* CreateNewIncrementingTask() {
return new IncrementingTask(&counter_lock_, &counter_,
&unique_threads_lock_, &unique_threads_);
}

base::Closure CreateNewBlockingIncrementingTaskCallback() {
BlockingIncrementingTaskArgs args = {
Task* CreateNewBlockingIncrementingTask() {
return new BlockingIncrementingTask(
&counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_,
&num_waiting_to_start_lock_, &num_waiting_to_start_,
&num_waiting_to_start_cv_, &start_
};
return base::Bind(&BlockingIncrementingTask, args);
&num_waiting_to_start_cv_, &start_);
}

scoped_refptr<base::PosixDynamicThreadPool> pool_;
Expand All @@ -154,7 +185,7 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {
EXPECT_EQ(0U, peer_.pending_tasks().size());

// Add one task and wait for it to be completed.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());

WaitForIdleThreads(1);

Expand All @@ -166,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) {

TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {
// Add one task and wait for it to be completed.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());

WaitForIdleThreads(1);

// Add another 2 tasks. One should reuse the existing worker thread.
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());

WaitForTasksToStart(2);
start_.Signal();
Expand All @@ -185,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) {

TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {
// Add two blocking tasks.
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());

EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet.";

Expand All @@ -201,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) {

TEST_F(PosixDynamicThreadPoolTest, Complex) {
// Add two non blocking tasks and wait for them to finish.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());

WaitForIdleThreads(1);

// Add two blocking tasks, start them simultaneously, and wait for them to
// finish.
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());
pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask());

WaitForTasksToStart(2);
start_.Signal();
Expand All @@ -228,7 +259,7 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) {
}

// Add another non blocking task. There are no threads to reuse.
pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback());
pool_->PostTask(FROM_HERE, CreateNewIncrementingTask());
WaitForIdleThreads(1);

EXPECT_EQ(3U, unique_threads_.size());
Expand Down
100 changes: 68 additions & 32 deletions base/win/object_watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,68 @@

#include "base/win/object_watcher.h"

#include "base/bind.h"
#include "base/logging.h"

namespace base {
namespace win {

//-----------------------------------------------------------------------------

ObjectWatcher::ObjectWatcher()
: weak_factory_(this),
object_(NULL),
wait_object_(NULL),
origin_loop_(NULL),
delegate_(NULL) {
struct ObjectWatcher::Watch : public Task {
ObjectWatcher* watcher; // The associated ObjectWatcher instance
HANDLE object; // The object being watched
HANDLE wait_object; // Returned by RegisterWaitForSingleObject
MessageLoop* origin_loop; // Used to get back to the origin thread
Delegate* delegate; // Delegate to notify when signaled
bool did_signal; // DoneWaiting was called

virtual void Run() {
// The watcher may have already been torn down, in which case we need to
// just get out of dodge.
if (!watcher)
return;

DCHECK(did_signal);
watcher->StopWatching();

delegate->OnObjectSignaled(object);
}
};

//-----------------------------------------------------------------------------

ObjectWatcher::ObjectWatcher() : watch_(NULL) {
}

ObjectWatcher::~ObjectWatcher() {
StopWatching();
}

bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) {
if (wait_object_) {
if (watch_) {
NOTREACHED() << "Already watching an object";
return false;
}

origin_loop_ = MessageLoop::current();
delegate_ = delegate;
Watch* watch = new Watch;
watch->watcher = this;
watch->object = object;
watch->origin_loop = MessageLoop::current();
watch->delegate = delegate;
watch->did_signal = false;

// Since our job is to just notice when an object is signaled and report the
// result back to this thread, we can just run on a Windows wait thread.
DWORD wait_flags = WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE;

if (!RegisterWaitForSingleObject(&wait_object_, object, DoneWaiting,
this, INFINITE, wait_flags)) {
if (!RegisterWaitForSingleObject(&watch->wait_object, object, DoneWaiting,
watch, INFINITE, wait_flags)) {
NOTREACHED() << "RegisterWaitForSingleObject failed: " << GetLastError();
delete watch;
return false;
}

object_ = object;
watch_ = watch;

// We need to know if the current message loop is going away so we can
// prevent the wait thread from trying to access a dead message loop.
Expand All @@ -52,46 +74,60 @@ bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) {
}

bool ObjectWatcher::StopWatching() {
if (!wait_object_)
if (!watch_)
return false;

// Make sure ObjectWatcher is used in a single-threaded fashion.
DCHECK(origin_loop_ == MessageLoop::current());
DCHECK(watch_->origin_loop == MessageLoop::current());

// Blocking call to cancel the wait. Any callbacks already in progress will
// finish before we return from this call.
if (!UnregisterWaitEx(wait_object_, INVALID_HANDLE_VALUE)) {
// If DoneWaiting is in progress, we wait for it to finish. We know whether
// DoneWaiting happened or not by inspecting the did_signal flag.
if (!UnregisterWaitEx(watch_->wait_object, INVALID_HANDLE_VALUE)) {
NOTREACHED() << "UnregisterWaitEx failed: " << GetLastError();
return false;
}

weak_factory_.InvalidateWeakPtrs();
object_ = NULL;
wait_object_ = NULL;
// Make sure that we see any mutation to did_signal. This should be a no-op
// since we expect that UnregisterWaitEx resulted in a memory barrier, but
// just to be sure, we're going to be explicit.
MemoryBarrier();

// If the watch has been posted, then we need to make sure it knows not to do
// anything once it is run.
watch_->watcher = NULL;

// If DoneWaiting was called, then the watch would have been posted as a
// task, and will therefore be deleted by the MessageLoop. Otherwise, we
// need to take care to delete it here.
if (!watch_->did_signal)
delete watch_;

watch_ = NULL;

MessageLoop::current()->RemoveDestructionObserver(this);
return true;
}

HANDLE ObjectWatcher::GetWatchedObject() {
return object_;
if (!watch_)
return NULL;

return watch_->object;
}

// static
void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) {
DCHECK(!timed_out);

// The destructor blocks on any callbacks that are in flight, so we know that
// that is always a pointer to a valid ObjectWater.
ObjectWatcher* that = static_cast<ObjectWatcher*>(param);
that->origin_loop_->PostTask(
FROM_HERE,
base::Bind(&ObjectWatcher::Signal, that->weak_factory_.GetWeakPtr()));
}
Watch* watch = static_cast<Watch*>(param);

void ObjectWatcher::Signal() {
StopWatching();
delegate_->OnObjectSignaled(object_);
// Record that we ran this function.
watch->did_signal = true;

// We rely on the locking in PostTask() to ensure that a memory barrier is
// provided, which in turn ensures our change to did_signal can be observed
// on the target thread.
watch->origin_loop->PostTask(FROM_HERE, watch);
}

void ObjectWatcher::WillDestroyCurrentMessageLoop() {
Expand Down
10 changes: 2 additions & 8 deletions base/win/object_watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <windows.h>

#include "base/base_export.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"

namespace base {
Expand Down Expand Up @@ -80,17 +79,12 @@ class BASE_EXPORT ObjectWatcher : public MessageLoop::DestructionObserver {
// Called on a background thread when done waiting.
static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out);

void Signal();

// MessageLoop::DestructionObserver implementation:
virtual void WillDestroyCurrentMessageLoop();

// Internal state.
base::WeakPtrFactory<ObjectWatcher> weak_factory_;
HANDLE object_; // The object being watched
HANDLE wait_object_; // Returned by RegisterWaitForSingleObject
MessageLoop* origin_loop_; // Used to get back to the origin thread
Delegate* delegate_; // Delegate to notify when signaled
struct Watch;
Watch* watch_;

DISALLOW_COPY_AND_ASSIGN(ObjectWatcher);
};
Expand Down

0 comments on commit 7ee9fa6

Please sign in to comment.