From 7ee9fa600b755946e87ce8f269728c57a0a943e4 Mon Sep 17 00:00:00 2001 From: "dcheng@chromium.org" Date: Tue, 29 Nov 2011 04:40:20 +0000 Subject: [PATCH] Revert 111865 - Remove several more custom Task implementations from base/ BUG=none TEST=trybots Review URL: http://codereview.chromium.org/8702016 TBR=dcheng@chromium.org Review URL: http://codereview.chromium.org/8729020 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@111868 0039d316-1c4b-4281-b951-d872f2087c98 --- base/threading/worker_pool_posix_unittest.cc | 131 ++++++++++++------- base/win/object_watcher.cc | 100 +++++++++----- base/win/object_watcher.h | 10 +- 3 files changed, 151 insertions(+), 90 deletions(-) diff --git a/base/threading/worker_pool_posix_unittest.cc b/base/threading/worker_pool_posix_unittest.cc index 1b700e1d7519..97e8807e3cdc 100644 --- a/base/threading/worker_pool_posix_unittest.cc +++ b/base/threading/worker_pool_posix_unittest.cc @@ -6,10 +6,9 @@ #include -#include "base/bind.h" -#include "base/callback.h" #include "base/synchronization/condition_variable.h" #include "base/synchronization/lock.h" +#include "base/task.h" #include "base/threading/platform_thread.h" #include "base/synchronization/waitable_event.h" #include "testing/gtest/include/gtest/gtest.h" @@ -51,41 +50,75 @@ namespace { // threads used if more than one IncrementingTask is consecutively posted to the // thread pool, since the first one might finish executing before the subsequent // PostTask() calls get invoked. -void IncrementingTask(Lock* counter_lock, - int* counter, - Lock* unique_threads_lock, - std::set* unique_threads) { - { - base::AutoLock locked(*unique_threads_lock); - unique_threads->insert(PlatformThread::CurrentId()); +class IncrementingTask : public Task { + public: + IncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set* unique_threads) + : counter_lock_(counter_lock), + unique_threads_lock_(unique_threads_lock), + unique_threads_(unique_threads), + counter_(counter) {} + + virtual void Run() { + AddSelfToUniqueThreadSet(); + base::AutoLock locked(*counter_lock_); + (*counter_)++; } - base::AutoLock locked(*counter_lock); - (*counter)++; -} -// BlockingIncrementingTask is a simple wrapper around IncrementingTask that -// allows for waiting at the start of Run() for a WaitableEvent to be signalled. -struct BlockingIncrementingTaskArgs { - Lock* counter_lock; - int* counter; - Lock* unique_threads_lock; - std::set* unique_threads; - Lock* num_waiting_to_start_lock; - int* num_waiting_to_start; - ConditionVariable* num_waiting_to_start_cv; - base::WaitableEvent* start; + void AddSelfToUniqueThreadSet() { + base::AutoLock locked(*unique_threads_lock_); + unique_threads_->insert(PlatformThread::CurrentId()); + } + + private: + Lock* counter_lock_; + Lock* unique_threads_lock_; + std::set* unique_threads_; + int* counter_; + + DISALLOW_COPY_AND_ASSIGN(IncrementingTask); }; -void BlockingIncrementingTask(const BlockingIncrementingTaskArgs& args) { - { - base::AutoLock num_waiting_to_start_locked(*args.num_waiting_to_start_lock); - (*args.num_waiting_to_start)++; +// BlockingIncrementingTask is a simple wrapper around IncrementingTask that +// allows for waiting at the start of Run() for a WaitableEvent to be signalled. +class BlockingIncrementingTask : public Task { + public: + BlockingIncrementingTask(Lock* counter_lock, + int* counter, + Lock* unique_threads_lock, + std::set* unique_threads, + Lock* num_waiting_to_start_lock, + int* num_waiting_to_start, + ConditionVariable* num_waiting_to_start_cv, + base::WaitableEvent* start) + : incrementer_( + counter_lock, counter, unique_threads_lock, unique_threads), + num_waiting_to_start_lock_(num_waiting_to_start_lock), + num_waiting_to_start_(num_waiting_to_start), + num_waiting_to_start_cv_(num_waiting_to_start_cv), + start_(start) {} + + virtual void Run() { + { + base::AutoLock num_waiting_to_start_locked(*num_waiting_to_start_lock_); + (*num_waiting_to_start_)++; + } + num_waiting_to_start_cv_->Signal(); + start_->Wait(); + incrementer_.Run(); } - args.num_waiting_to_start_cv->Signal(); - args.start->Wait(); - IncrementingTask(args.counter_lock, args.counter, args.unique_threads_lock, - args.unique_threads); -} + + private: + IncrementingTask incrementer_; + Lock* num_waiting_to_start_lock_; + int* num_waiting_to_start_; + ConditionVariable* num_waiting_to_start_cv_; + base::WaitableEvent* start_; + + DISALLOW_COPY_AND_ASSIGN(BlockingIncrementingTask); +}; class PosixDynamicThreadPoolTest : public testing::Test { protected: @@ -120,18 +153,16 @@ class PosixDynamicThreadPoolTest : public testing::Test { } } - base::Closure CreateNewIncrementingTaskCallback() { - return base::Bind(&IncrementingTask, &counter_lock_, &counter_, - &unique_threads_lock_, &unique_threads_); + Task* CreateNewIncrementingTask() { + return new IncrementingTask(&counter_lock_, &counter_, + &unique_threads_lock_, &unique_threads_); } - base::Closure CreateNewBlockingIncrementingTaskCallback() { - BlockingIncrementingTaskArgs args = { + Task* CreateNewBlockingIncrementingTask() { + return new BlockingIncrementingTask( &counter_lock_, &counter_, &unique_threads_lock_, &unique_threads_, &num_waiting_to_start_lock_, &num_waiting_to_start_, - &num_waiting_to_start_cv_, &start_ - }; - return base::Bind(&BlockingIncrementingTask, args); + &num_waiting_to_start_cv_, &start_); } scoped_refptr pool_; @@ -154,7 +185,7 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { EXPECT_EQ(0U, peer_.pending_tasks().size()); // Add one task and wait for it to be completed. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); @@ -166,13 +197,13 @@ TEST_F(PosixDynamicThreadPoolTest, Basic) { TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { // Add one task and wait for it to be completed. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add another 2 tasks. One should reuse the existing worker thread. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -185,8 +216,8 @@ TEST_F(PosixDynamicThreadPoolTest, ReuseIdle) { TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { // Add two blocking tasks. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); EXPECT_EQ(0, counter_) << "Blocking tasks should not have started yet."; @@ -201,14 +232,14 @@ TEST_F(PosixDynamicThreadPoolTest, TwoActiveTasks) { TEST_F(PosixDynamicThreadPoolTest, Complex) { // Add two non blocking tasks and wait for them to finish. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); // Add two blocking tasks, start them simultaneously, and wait for them to // finish. - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); - pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); + pool_->PostTask(FROM_HERE, CreateNewBlockingIncrementingTask()); WaitForTasksToStart(2); start_.Signal(); @@ -228,7 +259,7 @@ TEST_F(PosixDynamicThreadPoolTest, Complex) { } // Add another non blocking task. There are no threads to reuse. - pool_->PostTask(FROM_HERE, CreateNewIncrementingTaskCallback()); + pool_->PostTask(FROM_HERE, CreateNewIncrementingTask()); WaitForIdleThreads(1); EXPECT_EQ(3U, unique_threads_.size()); diff --git a/base/win/object_watcher.cc b/base/win/object_watcher.cc index be1e4090d2a6..4f5e7aba5016 100644 --- a/base/win/object_watcher.cc +++ b/base/win/object_watcher.cc @@ -4,7 +4,6 @@ #include "base/win/object_watcher.h" -#include "base/bind.h" #include "base/logging.h" namespace base { @@ -12,12 +11,30 @@ namespace win { //----------------------------------------------------------------------------- -ObjectWatcher::ObjectWatcher() - : weak_factory_(this), - object_(NULL), - wait_object_(NULL), - origin_loop_(NULL), - delegate_(NULL) { +struct ObjectWatcher::Watch : public Task { + ObjectWatcher* watcher; // The associated ObjectWatcher instance + HANDLE object; // The object being watched + HANDLE wait_object; // Returned by RegisterWaitForSingleObject + MessageLoop* origin_loop; // Used to get back to the origin thread + Delegate* delegate; // Delegate to notify when signaled + bool did_signal; // DoneWaiting was called + + virtual void Run() { + // The watcher may have already been torn down, in which case we need to + // just get out of dodge. + if (!watcher) + return; + + DCHECK(did_signal); + watcher->StopWatching(); + + delegate->OnObjectSignaled(object); + } +}; + +//----------------------------------------------------------------------------- + +ObjectWatcher::ObjectWatcher() : watch_(NULL) { } ObjectWatcher::~ObjectWatcher() { @@ -25,25 +42,30 @@ ObjectWatcher::~ObjectWatcher() { } bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) { - if (wait_object_) { + if (watch_) { NOTREACHED() << "Already watching an object"; return false; } - origin_loop_ = MessageLoop::current(); - delegate_ = delegate; + Watch* watch = new Watch; + watch->watcher = this; + watch->object = object; + watch->origin_loop = MessageLoop::current(); + watch->delegate = delegate; + watch->did_signal = false; // Since our job is to just notice when an object is signaled and report the // result back to this thread, we can just run on a Windows wait thread. DWORD wait_flags = WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE; - if (!RegisterWaitForSingleObject(&wait_object_, object, DoneWaiting, - this, INFINITE, wait_flags)) { + if (!RegisterWaitForSingleObject(&watch->wait_object, object, DoneWaiting, + watch, INFINITE, wait_flags)) { NOTREACHED() << "RegisterWaitForSingleObject failed: " << GetLastError(); + delete watch; return false; } - object_ = object; + watch_ = watch; // We need to know if the current message loop is going away so we can // prevent the wait thread from trying to access a dead message loop. @@ -52,46 +74,60 @@ bool ObjectWatcher::StartWatching(HANDLE object, Delegate* delegate) { } bool ObjectWatcher::StopWatching() { - if (!wait_object_) + if (!watch_) return false; // Make sure ObjectWatcher is used in a single-threaded fashion. - DCHECK(origin_loop_ == MessageLoop::current()); + DCHECK(watch_->origin_loop == MessageLoop::current()); - // Blocking call to cancel the wait. Any callbacks already in progress will - // finish before we return from this call. - if (!UnregisterWaitEx(wait_object_, INVALID_HANDLE_VALUE)) { + // If DoneWaiting is in progress, we wait for it to finish. We know whether + // DoneWaiting happened or not by inspecting the did_signal flag. + if (!UnregisterWaitEx(watch_->wait_object, INVALID_HANDLE_VALUE)) { NOTREACHED() << "UnregisterWaitEx failed: " << GetLastError(); return false; } - weak_factory_.InvalidateWeakPtrs(); - object_ = NULL; - wait_object_ = NULL; + // Make sure that we see any mutation to did_signal. This should be a no-op + // since we expect that UnregisterWaitEx resulted in a memory barrier, but + // just to be sure, we're going to be explicit. + MemoryBarrier(); + + // If the watch has been posted, then we need to make sure it knows not to do + // anything once it is run. + watch_->watcher = NULL; + + // If DoneWaiting was called, then the watch would have been posted as a + // task, and will therefore be deleted by the MessageLoop. Otherwise, we + // need to take care to delete it here. + if (!watch_->did_signal) + delete watch_; + + watch_ = NULL; MessageLoop::current()->RemoveDestructionObserver(this); return true; } HANDLE ObjectWatcher::GetWatchedObject() { - return object_; + if (!watch_) + return NULL; + + return watch_->object; } // static void CALLBACK ObjectWatcher::DoneWaiting(void* param, BOOLEAN timed_out) { DCHECK(!timed_out); - // The destructor blocks on any callbacks that are in flight, so we know that - // that is always a pointer to a valid ObjectWater. - ObjectWatcher* that = static_cast(param); - that->origin_loop_->PostTask( - FROM_HERE, - base::Bind(&ObjectWatcher::Signal, that->weak_factory_.GetWeakPtr())); -} + Watch* watch = static_cast(param); -void ObjectWatcher::Signal() { - StopWatching(); - delegate_->OnObjectSignaled(object_); + // Record that we ran this function. + watch->did_signal = true; + + // We rely on the locking in PostTask() to ensure that a memory barrier is + // provided, which in turn ensures our change to did_signal can be observed + // on the target thread. + watch->origin_loop->PostTask(FROM_HERE, watch); } void ObjectWatcher::WillDestroyCurrentMessageLoop() { diff --git a/base/win/object_watcher.h b/base/win/object_watcher.h index 86ac2d0bc84b..f5a46ebab8ee 100644 --- a/base/win/object_watcher.h +++ b/base/win/object_watcher.h @@ -9,7 +9,6 @@ #include #include "base/base_export.h" -#include "base/memory/weak_ptr.h" #include "base/message_loop.h" namespace base { @@ -80,17 +79,12 @@ class BASE_EXPORT ObjectWatcher : public MessageLoop::DestructionObserver { // Called on a background thread when done waiting. static void CALLBACK DoneWaiting(void* param, BOOLEAN timed_out); - void Signal(); - // MessageLoop::DestructionObserver implementation: virtual void WillDestroyCurrentMessageLoop(); // Internal state. - base::WeakPtrFactory weak_factory_; - HANDLE object_; // The object being watched - HANDLE wait_object_; // Returned by RegisterWaitForSingleObject - MessageLoop* origin_loop_; // Used to get back to the origin thread - Delegate* delegate_; // Delegate to notify when signaled + struct Watch; + Watch* watch_; DISALLOW_COPY_AND_ASSIGN(ObjectWatcher); };