Skip to content

Commit

Permalink
Cleanup IPC::ChannelProxy to use SingleThreadTaskRunner
Browse files Browse the repository at this point in the history
Also removed ClearIPCMessageLoop() as it is not needed anymore.

Review URL: https://chromiumcodereview.appspot.com/10694014

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@145192 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
sergeyu@chromium.org committed Jul 2, 2012
1 parent ddfc5b0 commit b243230
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 59 deletions.
10 changes: 0 additions & 10 deletions chrome/service/service_ipc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,6 @@ ServiceIPCServer::~ServiceIPCServer() {
#endif

channel_->RemoveFilter(sync_message_filter_.get());

// The ChannelProxy object caches a pointer to the IPC thread, so need to
// reset it as it's not guaranteed to outlive this object.
// NOTE: this also has the side-effect of not closing the main IPC channel to
// the browser process. This is needed because this is the signal that the
// browser uses to know that this process has died, so we need it to be alive
// until this process is shut down, and the OS closes the handle
// automatically. We used to watch the object handle on Windows to do this,
// but it wasn't possible to do so on POSIX.
channel_->ClearIPCMessageLoop();
}

void ServiceIPCServer::OnChannelConnected(int32 peer_pid) {
Expand Down
2 changes: 1 addition & 1 deletion content/common/child_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ ChildThread::~ChildThread() {
// until this process is shut down, and the OS closes the handle
// automatically. We used to watch the object handle on Windows to do this,
// but it wasn't possible to do so on POSIX.
channel_->ClearIPCMessageLoop();
channel_->ClearIPCTaskRunner();
}

void ChildThread::OnChannelError() {
Expand Down
1 change: 1 addition & 0 deletions content/common/indexed_db/indexed_db_message_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "base/bind.h"
#include "base/location.h"
#include "base/message_loop_proxy.h"
#include "content/common/indexed_db/indexed_db_dispatcher.h"
#include "content/common/indexed_db/indexed_db_messages.h"
#include "webkit/glue/worker_task_runner.h"
Expand Down
4 changes: 4 additions & 0 deletions content/common/indexed_db/indexed_db_message_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@

class IndexedDBDispatcher;

namespace base {
class MessageLoopProxy;
} // namespace base

class IndexedDBMessageFilter : public IPC::ChannelProxy::MessageFilter {
public:
IndexedDBMessageFilter();
Expand Down
4 changes: 4 additions & 0 deletions content/public/renderer/render_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

class MessageLoop;

namespace base {
class MessageLoopProxy;
}

namespace IPC {
class SyncChannel;
class SyncMessageFilter;
Expand Down
1 change: 1 addition & 0 deletions content/renderer/gpu/input_event_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "base/bind.h"
#include "base/debug/trace_event.h"
#include "base/location.h"
#include "base/message_loop_proxy.h"
#include "content/common/view_messages.h"
#include "content/renderer/gpu/input_event_filter.h"

Expand Down
1 change: 1 addition & 0 deletions content/test/mock_render_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "content/public/test/mock_render_thread.h"

#include "base/process_util.h"
#include "base/message_loop_proxy.h"
#include "content/common/view_messages.h"
#include "ipc/ipc_message_utils.h"
#include "ipc/ipc_sync_message.h"
Expand Down
45 changes: 26 additions & 19 deletions ipc/ipc_channel_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/single_thread_task_runner.h"
#include "base/thread_task_runner_handle.h"
#include "ipc/ipc_channel_proxy.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_logging.h"
Expand Down Expand Up @@ -43,17 +45,22 @@ ChannelProxy::MessageFilter::~MessageFilter() {}
//------------------------------------------------------------------------------

ChannelProxy::Context::Context(Listener* listener,
base::MessageLoopProxy* ipc_message_loop)
: listener_message_loop_(base::MessageLoopProxy::current()),
base::SingleThreadTaskRunner* ipc_task_runner)
: listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
listener_(listener),
ipc_message_loop_(ipc_message_loop),
ipc_task_runner_(ipc_task_runner),
channel_connected_called_(false),
peer_pid_(base::kNullProcessId) {
DCHECK(ipc_task_runner_);
}

ChannelProxy::Context::~Context() {
}

void ChannelProxy::Context::ClearIPCTaskRunner() {
ipc_task_runner_ = NULL;
}

void ChannelProxy::Context::CreateChannel(const IPC::ChannelHandle& handle,
const Channel::Mode& mode) {
DCHECK(channel_.get() == NULL);
Expand Down Expand Up @@ -94,7 +101,7 @@ bool ChannelProxy::Context::OnMessageReceivedNoFilter(const Message& message) {
// this thread is active. That should be a reasonable assumption, but it
// feels risky. We may want to invent some more indirect way of referring to
// a MessageLoop if this becomes a problem.
listener_message_loop_->PostTask(
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnDispatchMessage, this, message));
return true;
}
Expand All @@ -112,8 +119,8 @@ void ChannelProxy::Context::OnChannelConnected(int32 peer_pid) {
for (size_t i = 0; i < filters_.size(); ++i)
filters_[i]->OnChannelConnected(peer_pid);

// See above comment about using listener_message_loop_ here.
listener_message_loop_->PostTask(
// See above comment about using listener_task_runner_ here.
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnDispatchConnected, this));
}

Expand All @@ -122,8 +129,8 @@ void ChannelProxy::Context::OnChannelError() {
for (size_t i = 0; i < filters_.size(); ++i)
filters_[i]->OnChannelError();

// See above comment about using listener_message_loop_ here.
listener_message_loop_->PostTask(
// See above comment about using listener_task_runner_ here.
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnDispatchError, this));
}

Expand Down Expand Up @@ -214,7 +221,7 @@ void ChannelProxy::Context::OnRemoveFilter(MessageFilter* filter) {
void ChannelProxy::Context::AddFilter(MessageFilter* filter) {
base::AutoLock auto_lock(pending_filters_lock_);
pending_filters_.push_back(make_scoped_refptr(filter));
ipc_message_loop_->PostTask(
ipc_task_runner_->PostTask(
FROM_HERE, base::Bind(&Context::OnAddFilter, this));
}

Expand Down Expand Up @@ -276,8 +283,8 @@ void ChannelProxy::Context::OnDispatchError() {
ChannelProxy::ChannelProxy(const IPC::ChannelHandle& channel_handle,
Channel::Mode mode,
Listener* listener,
base::MessageLoopProxy* ipc_thread)
: context_(new Context(listener, ipc_thread)),
base::SingleThreadTaskRunner* ipc_task_runner)
: context_(new Context(listener, ipc_task_runner)),
outgoing_message_filter_(NULL),
did_init_(false) {
Init(channel_handle, mode, true);
Expand Down Expand Up @@ -314,13 +321,13 @@ void ChannelProxy::Init(const IPC::ChannelHandle& channel_handle,
// to connect and get an error since the pipe doesn't exist yet.
context_->CreateChannel(channel_handle, mode);
} else {
context_->ipc_message_loop()->PostTask(
context_->ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&Context::CreateChannel, context_.get(),
channel_handle, mode));
}

// complete initialization on the background thread
context_->ipc_message_loop()->PostTask(
context_->ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&Context::OnChannelOpened, context_.get()));

did_init_ = true;
Expand All @@ -332,8 +339,8 @@ void ChannelProxy::Close() {
// possible that the channel could be closed while it is receiving messages!
context_->Clear();

if (context_->ipc_message_loop()) {
context_->ipc_message_loop()->PostTask(
if (context_->ipc_task_runner()) {
context_->ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&Context::OnChannelClosed, context_.get()));
}
}
Expand All @@ -347,7 +354,7 @@ bool ChannelProxy::Send(Message* message) {
Logging::GetInstance()->OnSendMessage(message, context_->channel_id());
#endif

context_->ipc_message_loop()->PostTask(
context_->ipc_task_runner()->PostTask(
FROM_HERE,
base::Bind(&ChannelProxy::Context::OnSendMessage,
context_, base::Passed(scoped_ptr<Message>(message))));
Expand All @@ -359,13 +366,13 @@ void ChannelProxy::AddFilter(MessageFilter* filter) {
}

void ChannelProxy::RemoveFilter(MessageFilter* filter) {
context_->ipc_message_loop()->PostTask(
context_->ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&Context::OnRemoveFilter, context_.get(),
make_scoped_refptr(filter)));
}

void ChannelProxy::ClearIPCMessageLoop() {
context()->ClearIPCMessageLoop();
void ChannelProxy::ClearIPCTaskRunner() {
context()->ClearIPCTaskRunner();
}

#if defined(OS_POSIX) && !defined(OS_NACL)
Expand Down
27 changes: 15 additions & 12 deletions ipc/ipc_channel_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@

#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop_proxy.h"
#include "base/synchronization/lock.h"
#include "ipc/ipc_channel.h"
#include "ipc/ipc_channel_handle.h"
#include "ipc/ipc_listener.h"
#include "ipc/ipc_sender.h"

namespace base {
class SingleThreadTaskRunner;
}

namespace IPC {

class SendCallbackHelper;
Expand Down Expand Up @@ -121,12 +124,12 @@ class IPC_EXPORT ChannelProxy : public Sender {
// method is called on the thread where the IPC::Channel is running. The
// filter may be null if the consumer is not interested in handling messages
// on the background thread. Any message not handled by the filter will be
// dispatched to the listener. The given message loop indicates where the
// IPC::Channel should be created.
// dispatched to the listener. The given task runner correspond to a thread
// on which IPC::Channel is created and used (e.g. IO thread).
ChannelProxy(const IPC::ChannelHandle& channel_handle,
Channel::Mode mode,
Listener* listener,
base::MessageLoopProxy* ipc_thread_loop);
base::SingleThreadTaskRunner* ipc_task_runner);

virtual ~ChannelProxy();

Expand Down Expand Up @@ -167,8 +170,8 @@ class IPC_EXPORT ChannelProxy : public Sender {
outgoing_message_filter_ = filter;
}

// Called to clear the pointer to the IPC message loop when it's going away.
void ClearIPCMessageLoop();
// Called to clear the pointer to the IPC task runner when it's going away.
void ClearIPCTaskRunner();

// Get the process ID for the connected peer.
// Returns base::kNullProcessId if the peer is not connected yet.
Expand All @@ -191,10 +194,10 @@ class IPC_EXPORT ChannelProxy : public Sender {
class Context : public base::RefCountedThreadSafe<Context>,
public Listener {
public:
Context(Listener* listener, base::MessageLoopProxy* ipc_thread);
void ClearIPCMessageLoop() { ipc_message_loop_ = NULL; }
base::MessageLoopProxy* ipc_message_loop() const {
return ipc_message_loop_.get();
Context(Listener* listener, base::SingleThreadTaskRunner* ipc_thread);
void ClearIPCTaskRunner();
base::SingleThreadTaskRunner* ipc_task_runner() const {
return ipc_task_runner_;
}
const std::string& channel_id() const { return channel_id_; }

Expand Down Expand Up @@ -244,12 +247,12 @@ class IPC_EXPORT ChannelProxy : public Sender {
void OnDispatchConnected();
void OnDispatchError();

scoped_refptr<base::MessageLoopProxy> listener_message_loop_;
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
Listener* listener_;

// List of filters. This is only accessed on the IPC thread.
std::vector<scoped_refptr<MessageFilter> > filters_;
scoped_refptr<base::MessageLoopProxy> ipc_message_loop_;
scoped_refptr<base::SingleThreadTaskRunner> ipc_task_runner_;
scoped_ptr<Channel> channel_;
std::string channel_id_;
bool channel_connected_called_;
Expand Down
29 changes: 15 additions & 14 deletions ipc/ipc_sync_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/threading/thread_local.h"
#include "base/synchronization/waitable_event.h"
#include "base/synchronization/waitable_event_watcher.h"
#include "base/thread_task_runner_handle.h"
#include "base/threading/thread_local.h"
#include "ipc/ipc_sync_message.h"

using base::TimeDelta;
Expand Down Expand Up @@ -69,7 +70,7 @@ class SyncChannel::ReceivedSyncMsgQueue :

dispatch_event_.Signal();
if (!was_task_pending) {
listener_message_loop_->PostTask(
listener_task_runner_->PostTask(
FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchMessagesTask,
this, scoped_refptr<SyncContext>(context)));
}
Expand Down Expand Up @@ -145,8 +146,8 @@ class SyncChannel::ReceivedSyncMsgQueue :
}

WaitableEvent* dispatch_event() { return &dispatch_event_; }
base::MessageLoopProxy* listener_message_loop() {
return listener_message_loop_;
base::SingleThreadTaskRunner* listener_task_runner() {
return listener_task_runner_;
}

// Holds a pointer to the per-thread ReceivedSyncMsgQueue object.
Expand Down Expand Up @@ -182,7 +183,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
ReceivedSyncMsgQueue() :
message_queue_version_(0),
dispatch_event_(true, false),
listener_message_loop_(base::MessageLoopProxy::current()),
listener_task_runner_(base::ThreadTaskRunnerHandle::Get()),
task_pending_(false),
listener_count_(0),
top_send_done_watcher_(NULL) {
Expand All @@ -207,7 +208,7 @@ class SyncChannel::ReceivedSyncMsgQueue :
// sender needs its reply before it can reply to our original synchronous
// message.
WaitableEvent dispatch_event_;
scoped_refptr<base::MessageLoopProxy> listener_message_loop_;
scoped_refptr<base::SingleThreadTaskRunner> listener_task_runner_;
base::Lock message_lock_;
bool task_pending_;
int listener_count_;
Expand All @@ -224,9 +225,9 @@ base::LazyInstance<base::ThreadLocalPointer<SyncChannel::ReceivedSyncMsgQueue> >

SyncChannel::SyncContext::SyncContext(
Listener* listener,
base::MessageLoopProxy* ipc_thread,
base::SingleThreadTaskRunner* ipc_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy::Context(listener, ipc_thread),
: ChannelProxy::Context(listener, ipc_task_runner),
received_sync_msgs_(ReceivedSyncMsgQueue::AddContext()),
shutdown_event_(shutdown_event),
restrict_dispatch_group_(kRestrictDispatchGroup_None) {
Expand Down Expand Up @@ -273,7 +274,7 @@ bool SyncChannel::SyncContext::Pop() {
// blocking Send() call, whose reply we received after we made this last
// Send() call. So check if we have any queued replies available that
// can now unblock the listener thread.
ipc_message_loop()->PostTask(
ipc_task_runner()->PostTask(
FROM_HERE, base::Bind(&ReceivedSyncMsgQueue::DispatchReplies,
received_sync_msgs_.get()));

Expand Down Expand Up @@ -388,20 +389,20 @@ SyncChannel::SyncChannel(
const IPC::ChannelHandle& channel_handle,
Channel::Mode mode,
Listener* listener,
base::MessageLoopProxy* ipc_message_loop,
base::SingleThreadTaskRunner* ipc_task_runner,
bool create_pipe_now,
WaitableEvent* shutdown_event)
: ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)),
: ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
sync_messages_with_no_timeout_allowed_(true) {
ChannelProxy::Init(channel_handle, mode, create_pipe_now);
StartWatching();
}

SyncChannel::SyncChannel(
Listener* listener,
base::MessageLoopProxy* ipc_message_loop,
base::SingleThreadTaskRunner* ipc_task_runner,
WaitableEvent* shutdown_event)
: ChannelProxy(new SyncContext(listener, ipc_message_loop, shutdown_event)),
: ChannelProxy(new SyncContext(listener, ipc_task_runner, shutdown_event)),
sync_messages_with_no_timeout_allowed_(true) {
StartWatching();
}
Expand Down Expand Up @@ -443,7 +444,7 @@ bool SyncChannel::SendWithTimeout(Message* message, int timeout_ms) {
// We use the sync message id so that when a message times out, we don't
// confuse it with another send that is either above/below this Send in
// the call stack.
context->ipc_message_loop()->PostDelayedTask(
context->ipc_task_runner()->PostDelayedTask(
FROM_HERE,
base::Bind(&SyncContext::OnSendTimeout, context.get(), message_id),
base::TimeDelta::FromMilliseconds(timeout_ms));
Expand Down
Loading

0 comments on commit b243230

Please sign in to comment.