From b24323092173e732bb98c7aed0ab20d71e63893a Mon Sep 17 00:00:00 2001 From: "sergeyu@chromium.org" Date: Mon, 2 Jul 2012 21:15:52 +0000 Subject: [PATCH] Cleanup IPC::ChannelProxy to use SingleThreadTaskRunner Also removed ClearIPCMessageLoop() as it is not needed anymore. Review URL: https://chromiumcodereview.appspot.com/10694014 git-svn-id: svn://svn.chromium.org/chrome/trunk/src@145192 0039d316-1c4b-4281-b951-d872f2087c98 --- chrome/service/service_ipc_server.cc | 10 ----- content/common/child_thread.cc | 2 +- .../indexed_db/indexed_db_message_filter.cc | 1 + .../indexed_db/indexed_db_message_filter.h | 4 ++ content/public/renderer/render_thread.h | 4 ++ content/renderer/gpu/input_event_filter.cc | 1 + content/test/mock_render_thread.cc | 1 + ipc/ipc_channel_proxy.cc | 45 +++++++++++-------- ipc/ipc_channel_proxy.h | 27 ++++++----- ipc/ipc_sync_channel.cc | 29 ++++++------ ipc/ipc_sync_channel.h | 6 +-- 11 files changed, 71 insertions(+), 59 deletions(-) diff --git a/chrome/service/service_ipc_server.cc b/chrome/service/service_ipc_server.cc index fc006554a9b7a9..0f4060e6b8b188 100644 --- a/chrome/service/service_ipc_server.cc +++ b/chrome/service/service_ipc_server.cc @@ -39,16 +39,6 @@ ServiceIPCServer::~ServiceIPCServer() { #endif channel_->RemoveFilter(sync_message_filter_.get()); - - // The ChannelProxy object caches a pointer to the IPC thread, so need to - // reset it as it's not guaranteed to outlive this object. - // NOTE: this also has the side-effect of not closing the main IPC channel to - // the browser process. This is needed because this is the signal that the - // browser uses to know that this process has died, so we need it to be alive - // until this process is shut down, and the OS closes the handle - // automatically. We used to watch the object handle on Windows to do this, - // but it wasn't possible to do so on POSIX. - channel_->ClearIPCMessageLoop(); } void ServiceIPCServer::OnChannelConnected(int32 peer_pid) { diff --git a/content/common/child_thread.cc b/content/common/child_thread.cc index 3f00f93e120f25..e2f1b204cece53 100644 --- a/content/common/child_thread.cc +++ b/content/common/child_thread.cc @@ -83,7 +83,7 @@ ChildThread::~ChildThread() { // until this process is shut down, and the OS closes the handle // automatically. We used to watch the object handle on Windows to do this, // but it wasn't possible to do so on POSIX. - channel_->ClearIPCMessageLoop(); + channel_->ClearIPCTaskRunner(); } void ChildThread::OnChannelError() { diff --git a/content/common/indexed_db/indexed_db_message_filter.cc b/content/common/indexed_db/indexed_db_message_filter.cc index 66a0fc6fe71ff9..1d90bd2e8bce2c 100644 --- a/content/common/indexed_db/indexed_db_message_filter.cc +++ b/content/common/indexed_db/indexed_db_message_filter.cc @@ -6,6 +6,7 @@ #include "base/bind.h" #include "base/location.h" +#include "base/message_loop_proxy.h" #include "content/common/indexed_db/indexed_db_dispatcher.h" #include "content/common/indexed_db/indexed_db_messages.h" #include "webkit/glue/worker_task_runner.h" diff --git a/content/common/indexed_db/indexed_db_message_filter.h b/content/common/indexed_db/indexed_db_message_filter.h index dcfa7445534141..76e651f97581b9 100644 --- a/content/common/indexed_db/indexed_db_message_filter.h +++ b/content/common/indexed_db/indexed_db_message_filter.h @@ -10,6 +10,10 @@ class IndexedDBDispatcher; +namespace base { +class MessageLoopProxy; +} // namespace base + class IndexedDBMessageFilter : public IPC::ChannelProxy::MessageFilter { public: IndexedDBMessageFilter(); diff --git a/content/public/renderer/render_thread.h b/content/public/renderer/render_thread.h index 1d12f80454faae..1c100849c38603 100644 --- a/content/public/renderer/render_thread.h +++ b/content/public/renderer/render_thread.h @@ -17,6 +17,10 @@ class MessageLoop; +namespace base { +class MessageLoopProxy; +} + namespace IPC { class SyncChannel; class SyncMessageFilter; diff --git a/content/renderer/gpu/input_event_filter.cc b/content/renderer/gpu/input_event_filter.cc index 53bc9a2f0946de..6a49fcc9d4575c 100644 --- a/content/renderer/gpu/input_event_filter.cc +++ b/content/renderer/gpu/input_event_filter.cc @@ -5,6 +5,7 @@ #include "base/bind.h" #include "base/debug/trace_event.h" #include "base/location.h" +#include "base/message_loop_proxy.h" #include "content/common/view_messages.h" #include "content/renderer/gpu/input_event_filter.h" diff --git a/content/test/mock_render_thread.cc b/content/test/mock_render_thread.cc index 452c975755f382..81437184ac23d3 100644 --- a/content/test/mock_render_thread.cc +++ b/content/test/mock_render_thread.cc @@ -5,6 +5,7 @@ #include "content/public/test/mock_render_thread.h" #include "base/process_util.h" +#include "base/message_loop_proxy.h" #include "content/common/view_messages.h" #include "ipc/ipc_message_utils.h" #include "ipc/ipc_sync_message.h" diff --git a/ipc/ipc_channel_proxy.cc b/ipc/ipc_channel_proxy.cc index 6d2209da4acb79..0668b151a85873 100644 --- a/ipc/ipc_channel_proxy.cc +++ b/ipc/ipc_channel_proxy.cc @@ -8,6 +8,8 @@ #include "base/location.h" #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" +#include "base/single_thread_task_runner.h" +#include "base/thread_task_runner_handle.h" #include "ipc/ipc_channel_proxy.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_logging.h" @@ -43,17 +45,22 @@ ChannelProxy::MessageFilter::~MessageFilter() {} //------------------------------------------------------------------------------ ChannelProxy::Context::Context(Listener* listener, - base::MessageLoopProxy* ipc_message_loop) - : listener_message_loop_(base::MessageLoopProxy::current()), + base::SingleThreadTaskRunner* ipc_task_runner) + : listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), listener_(listener), - ipc_message_loop_(ipc_message_loop), + ipc_task_runner_(ipc_task_runner), channel_connected_called_(false), peer_pid_(base::kNullProcessId) { + DCHECK(ipc_task_runner_); } ChannelProxy::Context::~Context() { } +void ChannelProxy::Context::ClearIPCTaskRunner() { + ipc_task_runner_ = NULL; +} + void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle, const Channel::Mode& mode) { DCHECK(channel_.get() == NULL); @@ -94,7 +101,7 @@ bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) { // this thread is active. That should be a reasonable assumption, but it // feels risky. We may want to invent some more indirect way of referring to // a MessageLoop if this becomes a problem. - listener_message_loop_->PostTask( + listener_task_runner_->PostTask( FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message)); return true; } @@ -112,8 +119,8 @@ void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) { for (size_t i = 0; i < filters_.size(); ++i) filters_[i]->OnChannelConnected(peer_pid); - // See above comment about using listener_message_loop_ here. - listener_message_loop_->PostTask( + // See above comment about using listener_task_runner_ here. + listener_task_runner_->PostTask( FROM_HERE, base::Bind(&Context::OnDispatchConnected, this)); } @@ -122,8 +129,8 @@ void ChannelProxy::Context::OnChannelError() { for (size_t i = 0; i < filters_.size(); ++i) filters_[i]->OnChannelError(); - // See above comment about using listener_message_loop_ here. - listener_message_loop_->PostTask( + // See above comment about using listener_task_runner_ here. + listener_task_runner_->PostTask( FROM_HERE, base::Bind(&Context::OnDispatchError, this)); } @@ -214,7 +221,7 @@ void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) { void ChannelProxy::Context::AddFilter(MessageFilter* filter) { base::AutoLock auto_lock(pending_filters_lock_); pending_filters_.push_back(make_scoped_refptr(filter)); - ipc_message_loop_->PostTask( + ipc_task_runner_->PostTask( FROM_HERE, base::Bind(&Context::OnAddFilter, this)); } @@ -276,8 +283,8 @@ void ChannelProxy::Context::OnDispatchError() { ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, Listener* listener, - base::MessageLoopProxy* ipc_thread) - : context_(new Context(listener, ipc_thread)), + base::SingleThreadTaskRunner* ipc_task_runner) + : context_(new Context(listener, ipc_task_runner)), outgoing_message_filter_(NULL), did_init_(false) { Init(channel_handle, mode, true); @@ -314,13 +321,13 @@ void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle, // to connect and get an error since the pipe doesn't exist yet. context_->CreateChannel(channel_handle, mode); } else { - context_->ipc_message_loop()->PostTask( + context_->ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(), channel_handle, mode)); } // complete initialization on the background thread - context_->ipc_message_loop()->PostTask( + context_->ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get())); did_init_ = true; @@ -332,8 +339,8 @@ void ChannelProxy::Close() { // possible that the channel could be closed while it is receiving messages! context_->Clear(); - if (context_->ipc_message_loop()) { - context_->ipc_message_loop()->PostTask( + if (context_->ipc_task_runner()) { + context_->ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get())); } } @@ -347,7 +354,7 @@ bool ChannelProxy::Send(Message* message) { Logging::GetInstance()->OnSendMessage(message, context_->channel_id()); #endif - context_->ipc_message_loop()->PostTask( + context_->ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&ChannelProxy::Context::OnSendMessage, context_, base::Passed(scoped_ptr(message)))); @@ -359,13 +366,13 @@ void ChannelProxy::AddFilter(MessageFilter* filter) { } void ChannelProxy::RemoveFilter(MessageFilter* filter) { - context_->ipc_message_loop()->PostTask( + context_->ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(), make_scoped_refptr(filter))); } -void ChannelProxy::ClearIPCMessageLoop() { - context()->ClearIPCMessageLoop(); +void ChannelProxy::ClearIPCTaskRunner() { + context()->ClearIPCTaskRunner(); } #if defined(OS_POSIX) && !defined(OS_NACL) diff --git a/ipc/ipc_channel_proxy.h b/ipc/ipc_channel_proxy.h index 9989660ee9acef..4311054befe40e 100644 --- a/ipc/ipc_channel_proxy.h +++ b/ipc/ipc_channel_proxy.h @@ -10,13 +10,16 @@ #include "base/memory/ref_counted.h" #include "base/memory/scoped_ptr.h" -#include "base/message_loop_proxy.h" #include "base/synchronization/lock.h" #include "ipc/ipc_channel.h" #include "ipc/ipc_channel_handle.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_sender.h" +namespace base { +class SingleThreadTaskRunner; +} + namespace IPC { class SendCallbackHelper; @@ -121,12 +124,12 @@ class IPC_EXPORT ChannelProxy : public Sender { // method is called on the thread where the IPC::Channel is running. The // filter may be null if the consumer is not interested in handling messages // on the background thread. Any message not handled by the filter will be - // dispatched to the listener. The given message loop indicates where the - // IPC::Channel should be created. + // dispatched to the listener. The given task runner correspond to a thread + // on which IPC::Channel is created and used (e.g. IO thread). ChannelProxy(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, Listener* listener, - base::MessageLoopProxy* ipc_thread_loop); + base::SingleThreadTaskRunner* ipc_task_runner); virtual ~ChannelProxy(); @@ -167,8 +170,8 @@ class IPC_EXPORT ChannelProxy : public Sender { outgoing_message_filter_ = filter; } - // Called to clear the pointer to the IPC message loop when it's going away. - void ClearIPCMessageLoop(); + // Called to clear the pointer to the IPC task runner when it's going away. + void ClearIPCTaskRunner(); // Get the process ID for the connected peer. // Returns base::kNullProcessId if the peer is not connected yet. @@ -191,10 +194,10 @@ class IPC_EXPORT ChannelProxy : public Sender { class Context : public base::RefCountedThreadSafe, public Listener { public: - Context(Listener* listener, base::MessageLoopProxy* ipc_thread); - void ClearIPCMessageLoop() { ipc_message_loop_ = NULL; } - base::MessageLoopProxy* ipc_message_loop() const { - return ipc_message_loop_.get(); + Context(Listener* listener, base::SingleThreadTaskRunner* ipc_thread); + void ClearIPCTaskRunner(); + base::SingleThreadTaskRunner* ipc_task_runner() const { + return ipc_task_runner_; } const std::string& channel_id() const { return channel_id_; } @@ -244,12 +247,12 @@ class IPC_EXPORT ChannelProxy : public Sender { void OnDispatchConnected(); void OnDispatchError(); - scoped_refptr listener_message_loop_; + scoped_refptr listener_task_runner_; Listener* listener_; // List of filters. This is only accessed on the IPC thread. std::vector > filters_; - scoped_refptr ipc_message_loop_; + scoped_refptr ipc_task_runner_; scoped_ptr channel_; std::string channel_id_; bool channel_connected_called_; diff --git a/ipc/ipc_sync_channel.cc b/ipc/ipc_sync_channel.cc index 26636e59595c57..9a897b534bd333 100644 --- a/ipc/ipc_sync_channel.cc +++ b/ipc/ipc_sync_channel.cc @@ -8,9 +8,10 @@ #include "base/lazy_instance.h" #include "base/location.h" #include "base/logging.h" -#include "base/threading/thread_local.h" #include "base/synchronization/waitable_event.h" #include "base/synchronization/waitable_event_watcher.h" +#include "base/thread_task_runner_handle.h" +#include "base/threading/thread_local.h" #include "ipc/ipc_sync_message.h" using base::TimeDelta; @@ -69,7 +70,7 @@ class SyncChannel::ReceivedSyncMsgQueue : dispatch_event_.Signal(); if (!was_task_pending) { - listener_message_loop_->PostTask( + listener_task_runner_->PostTask( FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask, this, scoped_refptr(context))); } @@ -145,8 +146,8 @@ class SyncChannel::ReceivedSyncMsgQueue : } WaitableEvent* dispatch_event() { return &dispatch_event_; } - base::MessageLoopProxy* listener_message_loop() { - return listener_message_loop_; + base::SingleThreadTaskRunner* listener_task_runner() { + return listener_task_runner_; } // Holds a pointer to the per-thread ReceivedSyncMsgQueue object. @@ -182,7 +183,7 @@ class SyncChannel::ReceivedSyncMsgQueue : ReceivedSyncMsgQueue() : message_queue_version_(0), dispatch_event_(true, false), - listener_message_loop_(base::MessageLoopProxy::current()), + listener_task_runner_(base::ThreadTaskRunnerHandle::Get()), task_pending_(false), listener_count_(0), top_send_done_watcher_(NULL) { @@ -207,7 +208,7 @@ class SyncChannel::ReceivedSyncMsgQueue : // sender needs its reply before it can reply to our original synchronous // message. WaitableEvent dispatch_event_; - scoped_refptr listener_message_loop_; + scoped_refptr listener_task_runner_; base::Lock message_lock_; bool task_pending_; int listener_count_; @@ -224,9 +225,9 @@ base::LazyInstance > SyncChannel::SyncContext::SyncContext( Listener* listener, - base::MessageLoopProxy* ipc_thread, + base::SingleThreadTaskRunner* ipc_task_runner, WaitableEvent* shutdown_event) - : ChannelProxy::Context(listener, ipc_thread), + : ChannelProxy::Context(listener, ipc_task_runner), received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()), shutdown_event_(shutdown_event), restrict_dispatch_group_(kRestrictDispatchGroup_None) { @@ -273,7 +274,7 @@ bool SyncChannel::SyncContext::Pop() { // blocking Send() call, whose reply we received after we made this last // Send() call. So check if we have any queued replies available that // can now unblock the listener thread. - ipc_message_loop()->PostTask( + ipc_task_runner()->PostTask( FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies, received_sync_msgs_.get())); @@ -388,10 +389,10 @@ SyncChannel::SyncChannel( const IPC::ChannelHandle& channel_handle, Channel::Mode mode, Listener* listener, - base::MessageLoopProxy* ipc_message_loop, + base::SingleThreadTaskRunner* ipc_task_runner, bool create_pipe_now, WaitableEvent* shutdown_event) - : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), + : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), sync_messages_with_no_timeout_allowed_(true) { ChannelProxy::Init(channel_handle, mode, create_pipe_now); StartWatching(); @@ -399,9 +400,9 @@ SyncChannel::SyncChannel( SyncChannel::SyncChannel( Listener* listener, - base::MessageLoopProxy* ipc_message_loop, + base::SingleThreadTaskRunner* ipc_task_runner, WaitableEvent* shutdown_event) - : ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)), + : ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)), sync_messages_with_no_timeout_allowed_(true) { StartWatching(); } @@ -443,7 +444,7 @@ bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) { // We use the sync message id so that when a message times out, we don't // confuse it with another send that is either above/below this Send in // the call stack. - context->ipc_message_loop()->PostDelayedTask( + context->ipc_task_runner()->PostDelayedTask( FROM_HERE, base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id), base::TimeDelta::FromMilliseconds(timeout_ms)); diff --git a/ipc/ipc_sync_channel.h b/ipc/ipc_sync_channel.h index c6ebde167ec40a..dd610ee26e18f6 100644 --- a/ipc/ipc_sync_channel.h +++ b/ipc/ipc_sync_channel.h @@ -71,7 +71,7 @@ class IPC_EXPORT SyncChannel : public ChannelProxy, SyncChannel(const IPC::ChannelHandle& channel_handle, Channel::Mode mode, Listener* listener, - base::MessageLoopProxy* ipc_message_loop, + base::SingleThreadTaskRunner* ipc_task_runner, bool create_pipe_now, base::WaitableEvent* shutdown_event); @@ -79,7 +79,7 @@ class IPC_EXPORT SyncChannel : public ChannelProxy, // initialize the channel. This two-step setup allows message filters to be // added before any messages are sent or received. SyncChannel(Listener* listener, - base::MessageLoopProxy* ipc_message_loop, + base::SingleThreadTaskRunner* ipc_task_runner, base::WaitableEvent* shutdown_event); virtual ~SyncChannel(); @@ -120,7 +120,7 @@ class IPC_EXPORT SyncChannel : public ChannelProxy, public base::WaitableEventWatcher::Delegate { public: SyncContext(Listener* listener, - base::MessageLoopProxy* ipc_thread, + base::SingleThreadTaskRunner* ipc_task_runner, base::WaitableEvent* shutdown_event); // Adds information about an outgoing sync message to the context so that