Skip to content

Commit

Permalink
Mojo: Add "attach and run" methods to Channel and ChannelEndpoint.
Browse files Browse the repository at this point in the history
Currently, we can only handle the bootstrap case, so only convert (some
of) those. More work (elsewhere) will be needed to handle the
non-bootstrap case. (I've temporarily NOTREACHED() the code in
Channel::AttachAndRunEndpoint() for the non-bootstrap case, though the
code should be correct as-is, if not functional without other work yet.)

R=brettw@chromium.org

Review URL: https://codereview.chromium.org/645693005

Cr-Commit-Position: refs/heads/master@{#299546}
  • Loading branch information
viettrungluu-cr authored and Commit bot committed Oct 14, 2014
1 parent bfa9a4d commit f4e1eec
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 61 deletions.
16 changes: 1 addition & 15 deletions mojo/edk/embedder/embedder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "mojo/edk/embedder/platform_support.h"
#include "mojo/edk/system/channel.h"
#include "mojo/edk/system/channel_endpoint.h"
#include "mojo/edk/system/channel_endpoint_id.h"
#include "mojo/edk/system/core.h"
#include "mojo/edk/system/entrypoints.h"
#include "mojo/edk/system/message_pipe_dispatcher.h"
Expand Down Expand Up @@ -57,20 +56,7 @@ scoped_refptr<system::Channel> MakeChannel(
// Once |Init()| has succeeded, we have to return |channel| (since
// |Shutdown()| will have to be called on it).

// Attach the endpoint.
system::ChannelEndpointId endpoint_id =
channel->AttachEndpoint(channel_endpoint);
if (!endpoint_id.is_valid()) {
// This means that, e.g., the other endpoint of the message pipe was closed
// first. But it's not necessarily an error per se.
DVLOG(2) << "Channel::AttachEndpoint() failed";
return channel;
}
CHECK_EQ(endpoint_id, system::ChannelEndpointId::GetBootstrap());

channel->RunEndpoint(channel_endpoint,
system::ChannelEndpointId::GetBootstrap());

channel->AttachAndRunEndpoint(channel_endpoint, true);
return channel;
}

Expand Down
36 changes: 36 additions & 0 deletions mojo/edk/system/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,42 @@ void Channel::RunEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
endpoint->Run(remote_id);
}

void Channel::AttachAndRunEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
bool is_bootstrap) {
DCHECK(endpoint.get());

ChannelEndpointId local_id;
ChannelEndpointId remote_id;
{
base::AutoLock locker(lock_);

DLOG_IF(WARNING, is_shutting_down_)
<< "AttachEndpoint() while shutting down";

if (is_bootstrap) {
local_id = ChannelEndpointId::GetBootstrap();
DCHECK(local_id_to_endpoint_map_.find(local_id) ==
local_id_to_endpoint_map_.end());

remote_id = ChannelEndpointId::GetBootstrap();
} else {
// TODO(vtl): More work needs to be done to enable the non-bootstrap case.
NOTREACHED() << "Non-bootstrap case not yet fully implemented";
do {
local_id = local_id_generator_.GetNext();
} while (local_id_to_endpoint_map_.find(local_id) !=
local_id_to_endpoint_map_.end());

// TODO(vtl): We also need to check for collisions of remote IDs here.
remote_id = remote_id_generator_.GetNext();
}

local_id_to_endpoint_map_[local_id] = endpoint;
}

endpoint->AttachAndRun(this, local_id, remote_id);
}

void Channel::RunRemoteMessagePipeEndpoint(ChannelEndpointId local_id,
ChannelEndpointId remote_id) {
#if DCHECK_IS_ON
Expand Down
31 changes: 19 additions & 12 deletions mojo/edk/system/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,25 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// may be called multiple times, or not at all.)
void WillShutdownSoon();

// Attaches the given endpoint to this channel. This assigns it a local ID,
// which it returns. The first endpoint attached will always have
// |ChannelEndpointId::GetBootstrap()| as its local ID. (For bootstrapping,
// this occurs on both sides, so one should use that same bootstrap ID for the
// remote ID for the first message pipe across a channel.) Returns an invalid
// |ChannelEndpointId| on failure.
// TODO(vtl): This should be combined with "run", and it should take a
// |ChannelEndpoint| instead.
// TODO(vtl): Maybe limit the number of attached message pipes.
// TODO(vtl): Remove these once |AttachAndRunEndpoint()| is fully implemented
// and everything is converted to use it.
ChannelEndpointId AttachEndpoint(scoped_refptr<ChannelEndpoint> endpoint);

// Runs the given endpoint (which must have been attached to this |Channel|,
// and not detached), assigning it the specified |remote_id|.
void RunEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
ChannelEndpointId remote_id);

// Attaches the given endpoint to this channel and runs it. |is_bootstrap|
// should be set if and only if it is the first endpoint on the channel. This
// assigns the endpoint both local and remote IDs. If |is_bootstrap| is set,
// both are the bootstrap ID (given by |ChannelEndpointId::GetBootstrap()|).
//
// (Bootstrapping is symmetric: Both sides attach and run endpoints with
// |is_bootstrap| set, which establishes the first message pipe across a
// channel.)
//
// TODO(vtl): Maybe limit the number of attached message pipes.
void AttachAndRunEndpoint(scoped_refptr<ChannelEndpoint> endpoint,
bool is_bootstrap);

// Tells the other side of the channel to run a message pipe endpoint (which
// must already be attached); |local_id| and |remote_id| are relative to this
// channel (i.e., |local_id| is the other side's remote ID and |remote_id| is
Expand Down Expand Up @@ -183,6 +186,10 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
// Note: The IDs generated by this should be checked for existence before use.
LocalChannelEndpointIdGenerator local_id_generator_;

// TODO(vtl): We need to keep track of remote IDs (so that we don't collide
// if/when we wrap).
RemoteChannelEndpointIdGenerator remote_id_generator_;

DISALLOW_COPY_AND_ASSIGN(Channel);
};

Expand Down
21 changes: 21 additions & 0 deletions mojo/edk/system/channel_endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ void ChannelEndpoint::Run(ChannelEndpointId remote_id) {
}
}

void ChannelEndpoint::AttachAndRun(Channel* channel,
ChannelEndpointId local_id,
ChannelEndpointId remote_id) {
DCHECK(channel);
DCHECK(local_id.is_valid());
DCHECK(remote_id.is_valid());

base::AutoLock locker(lock_);
DCHECK(!channel_);
DCHECK(!local_id_.is_valid());
DCHECK(!remote_id_.is_valid());
channel_ = channel;
local_id_ = local_id;
remote_id_ = remote_id;

while (!paused_message_queue_.IsEmpty()) {
LOG_IF(WARNING, !WriteMessageNoLock(paused_message_queue_.GetMessage()))
<< "Failed to write enqueue message to channel";
}
}

bool ChannelEndpoint::OnReadMessage(
const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles) {
Expand Down
11 changes: 8 additions & 3 deletions mojo/edk/system/channel_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,17 @@ class MOJO_SYSTEM_IMPL_EXPORT ChannelEndpoint

// Methods called by |Channel|:

// Called by |Channel| when it takes a reference to this object.
// TODO(vtl): Remove these once we've switched over to |AttachAndRun()|.
void AttachToChannel(Channel* channel, ChannelEndpointId local_id);

// TODO(vtl): Combine this with |AttachToChannel()|.
void Run(ChannelEndpointId remote_id);

// Called by |Channel| when it takes a reference to this object. It will send
// all queue messages (in |paused_message_queue_|).
// TODO(vtl): Maybe rename this "OnAttach"?
void AttachAndRun(Channel* channel,
ChannelEndpointId local_id,
ChannelEndpointId remote_id);

// Called by |Channel| when it receives a message for the message pipe.
bool OnReadMessage(const MessageInTransit::View& message_view,
embedder::ScopedPlatformHandleVectorPtr platform_handles);
Expand Down
15 changes: 2 additions & 13 deletions mojo/edk/system/channel_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,7 @@ TEST_F(ChannelTest, ShutdownAfterAttach) {
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));

ChannelEndpointId local_id = channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(ChannelEndpointId::GetBootstrap(), local_id);

// TODO(vtl): Currently, we always "expect" a |RunMessagePipeEndpoint()| after
// an |AttachEndpoint()| (which is actually incorrect). We need to refactor
// |AttachEndpoint()| to indicate whether |Run...()| will necessarily be
// called or not. (Then, in the case that it may not be called, we should test
// a |Shutdown()| without the |Run...()|.)
channel()->RunEndpoint(channel_endpoint, ChannelEndpointId::GetBootstrap());
channel()->AttachAndRunEndpoint(channel_endpoint, true);

Waiter waiter;
waiter.Init();
Expand Down Expand Up @@ -287,10 +279,7 @@ TEST_F(ChannelTest, WaitAfterAttachRunAndShutdown) {
scoped_refptr<MessagePipe> mp(
MessagePipe::CreateLocalProxy(&channel_endpoint));

ChannelEndpointId local_id = channel()->AttachEndpoint(channel_endpoint);
EXPECT_EQ(ChannelEndpointId::GetBootstrap(), local_id);

channel()->RunEndpoint(channel_endpoint, ChannelEndpointId::GetBootstrap());
channel()->AttachAndRunEndpoint(channel_endpoint, true);

io_thread()->PostTaskAndWait(
FROM_HERE,
Expand Down
17 changes: 7 additions & 10 deletions mojo/edk/system/message_pipe_test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,13 @@ void ChannelThread::InitChannelOnIOThread(
channel_ = new Channel(platform_support_);
CHECK(channel_->Init(RawChannel::Create(platform_handle.Pass())));

// Attach the message pipe endpoint.
// Note: On the "server" (parent process) side, we need not attach the
// message pipe endpoint immediately. However, on the "client" (child
// process) side, this *must* be done here -- otherwise, the |Channel| may
// receive/process messages (which it can do as soon as it's hooked up to
// the IO thread message loop, and that message loop runs) before the
// message pipe endpoint is attached.
CHECK_EQ(channel_->AttachEndpoint(channel_endpoint),
ChannelEndpointId::GetBootstrap());
channel_->RunEndpoint(channel_endpoint, ChannelEndpointId::GetBootstrap());
// Attach and run the endpoint.
// Note: On the "server" (parent process) side, we need not attach/run the
// endpoint immediately. However, on the "client" (child process) side, this
// *must* be done here -- otherwise, the |Channel| may receive/process
// messages (which it can do as soon as it's hooked up to the IO thread
// message loop, and that message loop runs) before the endpoint is attached.
channel_->AttachAndRunEndpoint(channel_endpoint, true);
}

void ChannelThread::ShutdownChannelOnIOThread() {
Expand Down
9 changes: 1 addition & 8 deletions mojo/edk/system/remote_message_pipe_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,7 @@ class RemoteMessagePipeTest : public testing::Test {
CHECK(channel_index == 0 || channel_index == 1);

CreateAndInitChannel(channel_index);
ChannelEndpointId endpoint_id =
channels_[channel_index]->AttachEndpoint(ep);
if (!endpoint_id.is_valid())
return;

CHECK_EQ(endpoint_id, ChannelEndpointId::GetBootstrap());
channels_[channel_index]->RunEndpoint(ep,
ChannelEndpointId::GetBootstrap());
channels_[channel_index]->AttachAndRunEndpoint(ep, true);
}

void RestoreInitialStateOnIOThread() {
Expand Down

0 comments on commit f4e1eec

Please sign in to comment.