Skip to content

Commit

Permalink
Reland (3rd try): Lazily initialize MessageLoop for faster thread sta…
Browse files Browse the repository at this point in the history
…rtup

Original review: https://codereview.chromium.org/1011683002/
2nd try: https://codereview.chromium.org/1129953004/

2nd try reverted due to race reports on Linux:
https://crbug.com/489263 Data races on valid_thread_id_ after r330329

This fixes:
- Race in MessageLoopProxyImpl by introducing lock
- Race in BrowserMainLoop/BrowserThreadImpl, where BrowserThread::CurrentlyOn()
  called on one of BrowserThreads tries to touch other thread's message_loop()
  via global thread table.

Reg: the latter race, the code flow that causes this race is like following:

  // On the main thread, we create all known browser threads:
  for (...) {
    {
      AutoLock lock(g_lock);
      g_threads[id] = new BrowserProcessSubThread();
    }
    // [A] This initializes the thread's message_loop, which causes a race
    // against [B] in the new code because new threads can start running
    // immediately.
    thread->StartWithOptions();
  }

  // On the new thread's main function, it calls CurrentlyOn() which does:
  {
    // [B] This touches other thread's Thread::message_loop.
    AutoLock lock(g_lock);
    return g_threads[other_thread_id] &&
           g_threads[other_thread_id]->message_loop() == MessageLoop::current();
  }

This was safe before because both message_loop initialization and the first
call to CurrentlyOn() on the new thread was done synchronously in
StartWithOptions() while the main thread was blocked. In the new code
new threads can start accessing message_loop() asynchronously while
the main thread's for loop is running.

PS1 is the original patch (2nd try) that got reverted.

BUG=465458, 489263

Review URL: https://codereview.chromium.org/1131513007

Cr-Commit-Position: refs/heads/master@{#331235}
  • Loading branch information
kinu authored and Commit bot committed May 23, 2015
1 parent b955d50 commit 7f68f87
Show file tree
Hide file tree
Showing 25 changed files with 300 additions and 181 deletions.
36 changes: 26 additions & 10 deletions base/message_loop/incoming_task_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ IncomingTaskQueue::IncomingTaskQueue(MessageLoop* message_loop)
message_loop_(message_loop),
next_sequence_num_(0),
message_loop_scheduled_(false),
always_schedule_work_(AlwaysNotifyPump(message_loop_->type())) {
always_schedule_work_(AlwaysNotifyPump(message_loop_->type())),
is_ready_for_scheduling_(false) {
}

bool IncomingTaskQueue::AddToIncomingQueue(
Expand Down Expand Up @@ -109,6 +110,15 @@ void IncomingTaskQueue::WillDestroyCurrentMessageLoop() {
message_loop_ = NULL;
}

void IncomingTaskQueue::StartScheduling() {
AutoLock lock(incoming_queue_lock_);
DCHECK(!is_ready_for_scheduling_);
DCHECK(!message_loop_scheduled_);
is_ready_for_scheduling_ = true;
if (!incoming_queue_.empty())
ScheduleWork();
}

IncomingTaskQueue::~IncomingTaskQueue() {
// Verify that WillDestroyCurrentMessageLoop() has been called.
DCHECK(!message_loop_);
Expand Down Expand Up @@ -148,19 +158,25 @@ bool IncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
incoming_queue_.push(*pending_task);
pending_task->task.Reset();

if (always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) {
// Wake up the message loop.
message_loop_->ScheduleWork();
// After we've scheduled the message loop, we do not need to do so again
// until we know it has processed all of the work in our queue and is
// waiting for more work again. The message loop will always attempt to
// reload from the incoming queue before waiting again so we clear this flag
// in ReloadWorkQueue().
message_loop_scheduled_ = true;
if (is_ready_for_scheduling_ &&
(always_schedule_work_ || (!message_loop_scheduled_ && was_empty))) {
ScheduleWork();
}

return true;
}

void IncomingTaskQueue::ScheduleWork() {
DCHECK(is_ready_for_scheduling_);
// Wake up the message loop.
message_loop_->ScheduleWork();
// After we've scheduled the message loop, we do not need to do so again
// until we know it has processed all of the work in our queue and is
// waiting for more work again. The message loop will always attempt to
// reload from the incoming queue before waiting again so we clear this flag
// in ReloadWorkQueue().
message_loop_scheduled_ = true;
}

} // namespace internal
} // namespace base
10 changes: 10 additions & 0 deletions base/message_loop/incoming_task_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ class BASE_EXPORT IncomingTaskQueue
// Disconnects |this| from the parent message loop.
void WillDestroyCurrentMessageLoop();

// This should be called when the message loop becomes ready for
// scheduling work.
void StartScheduling();

private:
friend class RefCountedThreadSafe<IncomingTaskQueue>;
virtual ~IncomingTaskQueue();
Expand All @@ -66,6 +70,9 @@ class BASE_EXPORT IncomingTaskQueue
// does not retain |pending_task->task| beyond this function call.
bool PostPendingTask(PendingTask* pending_task);

// Wakes up the message loop and schedules work.
void ScheduleWork();

// Number of tasks that require high resolution timing. This value is kept
// so that ReloadWorkQueue() completes in constant time.
int high_res_task_count_;
Expand All @@ -92,6 +99,9 @@ class BASE_EXPORT IncomingTaskQueue
// if the incoming queue was not empty.
const bool always_schedule_work_;

// False until StartScheduling() is called.
bool is_ready_for_scheduling_;

DISALLOW_COPY_AND_ASSIGN(IncomingTaskQueue);
};

Expand Down
80 changes: 47 additions & 33 deletions base/message_loop/message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ MessagePumpForIO* ToPumpIO(MessagePump* pump) {
}
#endif // !defined(OS_NACL_SFI)

scoped_ptr<MessagePump> ReturnPump(scoped_ptr<MessagePump> pump) {
return pump;
}

} // namespace

//------------------------------------------------------------------------------
Expand All @@ -116,41 +120,19 @@ MessageLoop::DestructionObserver::~DestructionObserver() {
//------------------------------------------------------------------------------

MessageLoop::MessageLoop(Type type)
: type_(type),
#if defined(OS_WIN)
pending_high_res_tasks_(0),
in_high_res_mode_(false),
#endif
nestable_tasks_allowed_(true),
#if defined(OS_WIN)
os_modal_loop_(false),
#endif // OS_WIN
message_histogram_(NULL),
run_loop_(NULL) {
Init();

pump_ = CreateMessagePumpForType(type).Pass();
: MessageLoop(type, MessagePumpFactoryCallback()) {
BindToCurrentThread();
}

MessageLoop::MessageLoop(scoped_ptr<MessagePump> pump)
: pump_(pump.Pass()),
type_(TYPE_CUSTOM),
#if defined(OS_WIN)
pending_high_res_tasks_(0),
in_high_res_mode_(false),
#endif
nestable_tasks_allowed_(true),
#if defined(OS_WIN)
os_modal_loop_(false),
#endif // OS_WIN
message_histogram_(NULL),
run_loop_(NULL) {
DCHECK(pump_.get());
Init();
: MessageLoop(TYPE_CUSTOM, Bind(&ReturnPump, Passed(&pump))) {
BindToCurrentThread();
}

MessageLoop::~MessageLoop() {
DCHECK_EQ(this, current());
// current() could be NULL if this message loop is destructed before it is
// bound to a thread.
DCHECK(current() == this || !current());

// iOS just attaches to the loop, it doesn't Run it.
// TODO(stuartmorgan): Consider wiring up a Detach().
Expand Down Expand Up @@ -299,11 +281,13 @@ void MessageLoop::PostNonNestableDelayedTask(
}

void MessageLoop::Run() {
DCHECK(pump_);
RunLoop run_loop;
run_loop.Run();
}

void MessageLoop::RunUntilIdle() {
DCHECK(pump_);
RunLoop run_loop;
run_loop.RunUntilIdle();
}
Expand Down Expand Up @@ -383,13 +367,43 @@ bool MessageLoop::IsIdleForTesting() {

//------------------------------------------------------------------------------

void MessageLoop::Init() {
scoped_ptr<MessageLoop> MessageLoop::CreateUnbound(
Type type, MessagePumpFactoryCallback pump_factory) {
return make_scoped_ptr(new MessageLoop(type, pump_factory));
}

MessageLoop::MessageLoop(Type type, MessagePumpFactoryCallback pump_factory)
: type_(type),
#if defined(OS_WIN)
pending_high_res_tasks_(0),
in_high_res_mode_(false),
#endif
nestable_tasks_allowed_(true),
#if defined(OS_WIN)
os_modal_loop_(false),
#endif // OS_WIN
pump_factory_(pump_factory),
message_histogram_(NULL),
run_loop_(NULL),
incoming_task_queue_(new internal::IncomingTaskQueue(this)),
message_loop_proxy_(
new internal::MessageLoopProxyImpl(incoming_task_queue_)) {
// If type is TYPE_CUSTOM non-null pump_factory must be given.
DCHECK_EQ(type_ == TYPE_CUSTOM, !pump_factory_.is_null());
}

void MessageLoop::BindToCurrentThread() {
DCHECK(!pump_);
if (!pump_factory_.is_null())
pump_ = pump_factory_.Run();
else
pump_ = CreateMessagePumpForType(type_);

DCHECK(!current()) << "should only have one message loop per thread";
lazy_tls_ptr.Pointer()->Set(this);

incoming_task_queue_ = new internal::IncomingTaskQueue(this);
message_loop_proxy_ =
new internal::MessageLoopProxyImpl(incoming_task_queue_);
incoming_task_queue_->StartScheduling();
message_loop_proxy_->BindToCurrentThread();
thread_task_runner_handle_.reset(
new ThreadTaskRunnerHandle(message_loop_proxy_));
}
Expand Down
43 changes: 36 additions & 7 deletions base/message_loop/message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
explicit MessageLoop(Type type = TYPE_DEFAULT);
// Creates a TYPE_CUSTOM MessageLoop with the supplied MessagePump, which must
// be non-NULL.
explicit MessageLoop(scoped_ptr<base::MessagePump> pump);
explicit MessageLoop(scoped_ptr<MessagePump> pump);

~MessageLoop() override;

// Returns the MessageLoop object for the current thread, or null if none.
Expand Down Expand Up @@ -394,10 +395,6 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
// Returns true if the message loop is "idle". Provided for testing.
bool IsIdleForTesting();

// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
void ScheduleWork();

// Returns the TaskAnnotator which is used to add debug information to posted
// tasks.
debug::TaskAnnotator* task_annotator() { return &task_annotator_; }
Expand All @@ -411,9 +408,33 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {

private:
friend class RunLoop;
friend class internal::IncomingTaskQueue;
friend class ScheduleWorkTest;
friend class Thread;

using MessagePumpFactoryCallback = Callback<scoped_ptr<MessagePump>()>;

// Configures various members for the two constructors.
void Init();
// Creates a MessageLoop without binding to a thread.
// If |type| is TYPE_CUSTOM non-null |pump_factory| must be also given
// to create a message pump for this message loop. Otherwise a default
// message pump for the |type| is created.
//
// It is valid to call this to create a new message loop on one thread,
// and then pass it to the thread where the message loop actually runs.
// The message loop's BindToCurrentThread() method must be called on the
// thread the message loop runs on, before calling Run().
// Before BindToCurrentThread() is called only Post*Task() functions can
// be called on the message loop.
scoped_ptr<MessageLoop> CreateUnbound(
Type type,
MessagePumpFactoryCallback pump_factory);

// Common private constructor. Other constructors delegate the initialization
// to this constructor.
MessageLoop(Type type, MessagePumpFactoryCallback pump_factory);

// Configure various members and bind this message loop to the current thread.
void BindToCurrentThread();

// Invokes the actual run loop using the message pump.
void RunHandler();
Expand All @@ -437,6 +458,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
// empty.
void ReloadWorkQueue();

// Wakes up the message pump. Can be called on any thread. The caller is
// responsible for synchronizing ScheduleWork() calls.
void ScheduleWork();

// Start recording histogram info about events and action IF it was enabled
// and IF the statistics recorder can accept a registration of our histogram.
void StartHistogrammer();
Expand Down Expand Up @@ -490,6 +515,10 @@ class BASE_EXPORT MessageLoop : public MessagePump::Delegate {
bool os_modal_loop_;
#endif

// pump_factory_.Run() is called to create a message pump for this loop
// if type_ is TYPE_CUSTOM and pump_ is null.
MessagePumpFactoryCallback pump_factory_;

std::string thread_name_;
// A profiling histogram showing the counts of various messages and events.
HistogramBase* message_histogram_;
Expand Down
9 changes: 8 additions & 1 deletion base/message_loop/message_loop_proxy_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ namespace internal {
MessageLoopProxyImpl::MessageLoopProxyImpl(
scoped_refptr<IncomingTaskQueue> incoming_queue)
: incoming_queue_(incoming_queue),
valid_thread_id_(PlatformThread::CurrentId()) {
valid_thread_id_(kInvalidThreadId) {
}

void MessageLoopProxyImpl::BindToCurrentThread() {
AutoLock lock(valid_thread_id_lock_);
DCHECK_EQ(kInvalidThreadId, valid_thread_id_);
valid_thread_id_ = PlatformThread::CurrentId();
}

bool MessageLoopProxyImpl::PostDelayedTask(
Expand All @@ -35,6 +41,7 @@ bool MessageLoopProxyImpl::PostNonNestableDelayedTask(
}

bool MessageLoopProxyImpl::RunsTasksOnCurrentThread() const {
AutoLock lock(valid_thread_id_lock_);
return valid_thread_id_ == PlatformThread::CurrentId();
}

Expand Down
8 changes: 7 additions & 1 deletion base/message_loop/message_loop_proxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/pending_task.h"
#include "base/synchronization/lock.h"
#include "base/threading/platform_thread.h"

namespace base {
Expand All @@ -24,6 +25,9 @@ class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy {
explicit MessageLoopProxyImpl(
scoped_refptr<IncomingTaskQueue> incoming_queue);

// Initialize this message loop proxy on the current thread.
void BindToCurrentThread();

// MessageLoopProxy implementation
bool PostDelayedTask(const tracked_objects::Location& from_here,
const base::Closure& task,
Expand All @@ -40,8 +44,10 @@ class BASE_EXPORT MessageLoopProxyImpl : public MessageLoopProxy {
// THe incoming queue receiving all posted tasks.
scoped_refptr<IncomingTaskQueue> incoming_queue_;

// ID of the thread |this| was created on.
// ID of the thread |this| was created on. Could be accessed on multiple
// threads, protected by |valid_thread_id_lock_|.
PlatformThreadId valid_thread_id_;
mutable Lock valid_thread_id_lock_;

DISALLOW_COPY_AND_ASSIGN(MessageLoopProxyImpl);
};
Expand Down
5 changes: 0 additions & 5 deletions base/message_loop/message_pump_perftest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#endif

namespace base {
namespace {

class ScheduleWorkTest : public testing::Test {
public:
Expand Down Expand Up @@ -224,9 +223,6 @@ TEST_F(ScheduleWorkTest, ThreadTimeToJavaFromFourThreads) {
}
#endif

static void DoNothing() {
}

class FakeMessagePump : public MessagePump {
public:
FakeMessagePump() {}
Expand Down Expand Up @@ -289,5 +285,4 @@ TEST_F(PostTaskTest, OneHundredTasksPerReload) {
Run(1000, 100);
}

} // namespace
} // namespace base
4 changes: 4 additions & 0 deletions base/threading/platform_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ class PlatformThreadHandle {
id_(id) {
}

PlatformThreadId id() const {
return id_;
}

bool is_equal(const PlatformThreadHandle& other) const {
return handle_ == other.handle_;
}
Expand Down
5 changes: 3 additions & 2 deletions base/threading/platform_thread_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,16 @@ bool CreateThreadInternal(size_t stack_size,
// have to work running on CreateThread() threads anyway, since we run code
// on the Windows thread pool, etc. For some background on the difference:
// http://www.microsoft.com/msj/1099/win32/win321099.aspx
PlatformThreadId thread_id;
void* thread_handle = CreateThread(
NULL, stack_size, ThreadFunc, params, flags, NULL);
NULL, stack_size, ThreadFunc, params, flags, &thread_id);
if (!thread_handle) {
delete params;
return false;
}

if (out_thread_handle)
*out_thread_handle = PlatformThreadHandle(thread_handle);
*out_thread_handle = PlatformThreadHandle(thread_handle, thread_id);
else
CloseHandle(thread_handle);
return true;
Expand Down
Loading

0 comments on commit 7f68f87

Please sign in to comment.