Skip to content

Commit

Permalink
AssociatedGroupController impls: reduce message pipes created for syn…
Browse files Browse the repository at this point in the history
…c message signalling.

Previously, we always create a message pipe for sync message signalling when
SignalSyncMessageEvent() is called. And this method is always called when peer
endpoint is closed. Most of time, this message pipe is not used.

BUG=None
TBR=rockot@chromium.org
(rockot has LGed and is an owner of all files. But somehow presubmit check thought ipc_mojo_bootstrap.cc needed an OWNER review.)

Review-Url: https://codereview.chromium.org/2718253005
Cr-Commit-Position: refs/heads/master@{#453714}
  • Loading branch information
yzshen authored and Commit bot committed Feb 28, 2017
1 parent e883e24 commit e25b5d5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 36 deletions.
13 changes: 8 additions & 5 deletions ipc/ipc_mojo_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,9 @@ class ChannelAssociatedGroupController

void SignalSyncMessageEvent() {
controller_->lock_.AssertAcquired();
EnsureSyncMessageEventExists();
sync_message_event_->Signal();

if (sync_message_event_)
sync_message_event_->Signal();
}

MessageWrapper PopSyncMessage(uint32_t id) {
Expand Down Expand Up @@ -489,9 +490,11 @@ class ChannelAssociatedGroupController

{
base::AutoLock locker(controller_->lock_);
EnsureSyncMessageEventExists();
if (!sync_messages_.empty())
SignalSyncMessageEvent();
if (!sync_message_event_) {
sync_message_event_.reset(new MojoEvent);
if (peer_closed_ || !sync_messages_.empty())
SignalSyncMessageEvent();
}
}

sync_watcher_.reset(new mojo::SyncHandleWatcher(
Expand Down
60 changes: 31 additions & 29 deletions mojo/public/cpp/bindings/lib/multiplex_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ class MultiplexRouter::InterfaceEndpoint
if (event_signalled_)
return;

EnsureEventMessagePipeExists();
event_signalled_ = true;
if (!sync_message_event_sender_.is_valid())
return;

MojoResult result =
WriteMessageRaw(sync_message_event_sender_.get(), nullptr, 0, nullptr,
0, MOJO_WRITE_MESSAGE_FLAG_NONE);
Expand All @@ -125,12 +127,14 @@ class MultiplexRouter::InterfaceEndpoint
if (!event_signalled_)
return;

DCHECK(sync_message_event_receiver_.is_valid());
event_signalled_ = false;
if (!sync_message_event_receiver_.is_valid())
return;

MojoResult result =
ReadMessageRaw(sync_message_event_receiver_.get(), nullptr, nullptr,
nullptr, nullptr, MOJO_READ_MESSAGE_FLAG_MAY_DISCARD);
DCHECK_EQ(MOJO_RESULT_OK, result);
event_signalled_ = false;
}

// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -203,29 +207,27 @@ class MultiplexRouter::InterfaceEndpoint

{
MayAutoLock locker(&router_->lock_);
EnsureEventMessagePipeExists();

auto iter = router_->sync_message_tasks_.find(id_);
if (iter != router_->sync_message_tasks_.end() && !iter->second.empty())
SignalSyncMessageEvent();
if (!sync_message_event_sender_.is_valid()) {
MojoResult result =
CreateMessagePipe(nullptr, &sync_message_event_sender_,
&sync_message_event_receiver_);
DCHECK_EQ(MOJO_RESULT_OK, result);

if (event_signalled_) {
// Reset the flag so that SignalSyncMessageEvent() will actually
// signal using the newly-created message pipe.
event_signalled_ = false;
SignalSyncMessageEvent();
}
}
}

sync_watcher_.reset(new SyncHandleWatcher(
sync_message_event_receiver_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&InterfaceEndpoint::OnHandleReady, base::Unretained(this))));
}

void EnsureEventMessagePipeExists() {
router_->AssertLockAcquired();

if (sync_message_event_receiver_.is_valid())
return;

MojoResult result = CreateMessagePipe(nullptr, &sync_message_event_sender_,
&sync_message_event_receiver_);
DCHECK_EQ(MOJO_RESULT_OK, result);
}

// ---------------------------------------------------------------------------
// The following members are safe to access from any threads.

Expand Down Expand Up @@ -581,8 +583,11 @@ void MultiplexRouter::ResumeIncomingMethodCallProcessing() {

for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) {
auto sync_iter = sync_message_tasks_.find(iter->first);
if (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())
if (iter->second->peer_closed() ||
(sync_iter != sync_message_tasks_.end() &&
!sync_iter->second.empty())) {
iter->second->SignalSyncMessageEvent();
}
}

ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr);
Expand Down Expand Up @@ -926,16 +931,13 @@ void MultiplexRouter::LockAndCallProcessTasks() {
void MultiplexRouter::UpdateEndpointStateMayRemove(
InterfaceEndpoint* endpoint,
EndpointStateUpdateType type) {
switch (type) {
case ENDPOINT_CLOSED:
endpoint->set_closed();
break;
case PEER_ENDPOINT_CLOSED:
endpoint->set_peer_closed();
// If the interface endpoint is performing a sync watch, this makes sure
// it is notified and eventually exits the sync watch.
endpoint->SignalSyncMessageEvent();
break;
if (type == ENDPOINT_CLOSED) {
endpoint->set_closed();
} else {
endpoint->set_peer_closed();
// If the interface endpoint is performing a sync watch, this makes sure
// it is notified and eventually exits the sync watch.
endpoint->SignalSyncMessageEvent();
}
if (endpoint->closed() && endpoint->peer_closed())
endpoints_.erase(endpoint->id());
Expand Down
4 changes: 2 additions & 2 deletions mojo/public/cpp/bindings/tests/sync_method_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ TestSync::AsyncEchoCallback BindAsyncEchoCallback(Func func) {
return base::Bind(&CallAsyncEchoCallback<Func>, func);
}

// TestSync and TestSyncMaster exercise Router and MultiplexRouter,
// respectively.
// TestSync (without associated interfaces) and TestSyncMaster (with associated
// interfaces) exercise MultiplexRouter with different configurations.
using InterfaceTypes = testing::Types<TestSync, TestSyncMaster>;
TYPED_TEST_CASE(SyncMethodCommonTest, InterfaceTypes);

Expand Down

0 comments on commit e25b5d5

Please sign in to comment.