Skip to content

Commit

Permalink
Mojo: Make some attempts towards fixing remote message pipe closure.
Browse files Browse the repository at this point in the history
It's not quite right yet (I still need to fix how they get started -- i.e.,
attached and run -- since that races in a poor way with how they get closed),
but I think I'm getting closer.

There's an intentional cycle of ref-counted objects, between the
ProxyMessagePipeEndpoint and the Channel, which needs to be broken when
one of the (real) endpoints of the message pipe is closed. The picture
is:

LocalMPE-ProxyMPE-Channel <-----> Channel-ProxyMPE-LocalMPE

When a LocalMessagePipeEndpoint is closed, ultimately the other
LocalMessagePipeEndpoint needs to be informed ("OnPeerClosed"), and the
two ProxyMessagePipeEndpoints can be torn down.

There are many difficulties, however. First, while a
ProxyMessagePipeEndpoint may be torn down, its entry (with its local ID)
may not be removed from the Channel until it's known that the other side
will not send messages to that ID (because you can't re-use that ID
until then). Basically, this amounts to notifying the other side and
receiving an ack. (We can mark the entry in the Channel with a suitable
"zombie" state to handle this.)

Second, both LocalMessagePipeEndpoints may be closed "simultaneously" --
resulting in the removal notification messages "crossing paths". Of
course, once you receive a removal notification for a given ID, you know
the other side will not send any more messages to that ID. So, there's
no need to send an ack in that case.

Now, there are lock order issues, so that removal of an ID from a
channel (upon receiving a removal message) races with the
LocalMessagePipeEndpoint (on the "local" side) being closed, so you have
to handle that.

Then there are races with the ProxyMessagePipeEndpoint being attached
and run (and possibly receiving a removal message). (This CL doesn't
address all those issues.)

Finally, note that sometimes you have to keep the
ProxyMessagePipeEndpoint alive past "OnPeerClose". E.g., it's valid for
you to write a whole bunch of messages to a MessagePipe endpoint, and
then immediately close it. The ProxyMessagePipeEndpoint must stay alive
until it's been attached/run and sent out all those messages (and a
remove message).

R=darin@chromium.org
BUG=360081

Review URL: https://codereview.chromium.org/240133005

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@264716 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
viettrungluu@chromium.org committed Apr 18, 2014
1 parent 23761b9 commit 3e8f1f0
Show file tree
Hide file tree
Showing 15 changed files with 391 additions and 132 deletions.
2 changes: 2 additions & 0 deletions mojo/embedder/embedder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ static void CreateChannelOnIOThread(
// Attach the message pipe endpoint.
system::MessageInTransit::EndpointId endpoint_id =
channel_info->channel->AttachMessagePipeEndpoint(message_pipe, 1);
// We shouldn't get |kInvalidEndpointId| here -- since |CreateChannel()| is
// responsible for the local endpoint, and won't close it.
DCHECK_EQ(endpoint_id, system::Channel::kBootstrapEndpointId);
success = channel_info->channel->RunMessagePipeEndpoint(
system::Channel::kBootstrapEndpointId,
Expand Down
256 changes: 211 additions & 45 deletions mojo/system/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
Channel::kBootstrapEndpointId;

Channel::EndpointInfo::EndpointInfo() {
Channel::EndpointInfo::EndpointInfo()
: state(STATE_NORMAL),
port() {
}

Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
unsigned port)
: message_pipe(message_pipe),
: state(STATE_NORMAL),
message_pipe(message_pipe),
port(port) {
}

Expand Down Expand Up @@ -62,23 +65,34 @@ void Channel::Shutdown() {
raw_channel_->Shutdown();
raw_channel_.reset();

// This should not occur, but it probably mostly results in leaking;
// (Explicitly clearing the |local_id_to_endpoint_info_map_| would likely put
// things in an inconsistent state, which is worse. Note that if the map is
// nonempty, we probably won't be destroyed, since the endpoints have a
// reference to us.)
LOG_IF(ERROR, !local_id_to_endpoint_info_map_.empty())
<< "Channel shutting down with endpoints still attached";
// TODO(vtl): This currently blows up, but the fix will be nontrivial.
// crbug.com/360081
//DCHECK(local_id_to_endpoint_info_map_.empty());
// This shouldn't usually occur, but it should be okay if all the endpoints
// are zombies (i.e., waiting to be removed, and not actually having any
// references to |MessagePipe|s).
// TODO(vtl): To make this actually okay, we need to make sure the other side
// channels being killed off properly.
LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
<< "Channel shutting down with endpoints still attached "
"(hopefully all zombies)";

#ifndef NDEBUG
// Check that everything left is a zombie. (Note: We don't explicitly clear
// |local_id_to_endpoint_info_map_|, since that would likely put us in an
// inconsistent state if we have non-zombies.)
for (IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.begin();
it != local_id_to_endpoint_info_map_.end();
++it) {
DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL);
DCHECK(!it->second.message_pipe.get());
}
#endif
}

MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
scoped_refptr<MessagePipe> message_pipe, unsigned port) {
scoped_refptr<MessagePipe> message_pipe,
unsigned port) {
DCHECK(message_pipe);
DCHECK(port == 0 || port == 1);
// Note: This assertion must *not* be done under |lock_|.
DCHECK_EQ(message_pipe->GetType(port), MessagePipeEndpoint::kTypeProxy);

MessageInTransit::EndpointId local_id;
{
Expand All @@ -98,8 +112,32 @@ MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
}

message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
return local_id;
// This might fail if that port got an |OnPeerClose()| before attaching.
if (message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id))
return local_id;

// Note: If it failed, quite possibly the endpoint info was removed from that
// map (there's a race between us adding it to the map above and calling
// |Attach()|). And even if an entry exists for |local_id|, we need to check
// that it's the one we added (and not some other one that was added since).
{
base::AutoLock locker(lock_);
IdToEndpointInfoMap::iterator it =
local_id_to_endpoint_info_map_.find(local_id);
if (it != local_id_to_endpoint_info_map_.end() &&
it->second.message_pipe.get() == message_pipe.get() &&
it->second.port == port) {
DCHECK_EQ(it->second.state, EndpointInfo::STATE_NORMAL);
// TODO(vtl): FIXME -- This is wrong. We need to specify (to
// |AttachMessagePipeEndpoint()| who's going to be responsible for calling
// |RunMessagePipeEndpoint()| ("us", or the remote by sending us a
// |kSubtypeChannelRunMessagePipeEndpoint|). If the remote is going to
// run, then we'll get messages to an "invalid" local ID (for running, for
// removal).
local_id_to_endpoint_info_map_.erase(it);
}
}
return MessageInTransit::kInvalidEndpointId;
}

bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
Expand All @@ -115,6 +153,14 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
endpoint_info = it->second;
}

// Assume that this was in response to |kSubtypeChannelRunMessagePipeEndpoint|
// and ignore it.
if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
DVLOG(2) << "Ignoring run message pipe endpoint for zombie endpoint "
"(local ID " << local_id << ", remote ID " << remote_id << ")";
return true;
}

// TODO(vtl): FIXME -- We need to handle the case that message pipe is already
// running when we're here due to |kSubtypeChannelRunMessagePipeEndpoint|).
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
Expand All @@ -124,21 +170,21 @@ bool Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
void Channel::RunRemoteMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
base::AutoLock locker(lock_);

DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
local_id_to_endpoint_info_map_.end());
#if DCHECK_IS_ON
{
base::AutoLock locker(lock_);
DCHECK(local_id_to_endpoint_info_map_.find(local_id) !=
local_id_to_endpoint_info_map_.end());
}
#endif

scoped_ptr<MessageInTransit> message(new MessageInTransit(
MessageInTransit::kTypeChannel,
MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
0, 0, NULL));
message->set_source_id(local_id);
message->set_destination_id(remote_id);
if (!raw_channel_->WriteMessage(message.Pass())) {
// TODO(vtl): FIXME -- I guess we should report the error back somehow so
// that the dispatcher can be closed?
CHECK(false) << "Not yet handled";
if (!SendControlMessage(
MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint,
local_id, remote_id)) {
HandleLocalError(base::StringPrintf(
"Failed to send message to run remote message pipe endpoint (local ID "
"%u, remote ID %u)",
static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
}
}

Expand All @@ -160,20 +206,52 @@ bool Channel::IsWriteBufferEmpty() {
return raw_channel_->IsWriteBufferEmpty();
}

void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
void Channel::DetachMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);

base::AutoLock locker_(lock_);
local_id_to_endpoint_info_map_.erase(local_id);
bool should_send_remove_message = false;
{
base::AutoLock locker_(lock_);
IdToEndpointInfoMap::iterator it =
local_id_to_endpoint_info_map_.find(local_id);
DCHECK(it != local_id_to_endpoint_info_map_.end());

switch (it->second.state) {
case EndpointInfo::STATE_NORMAL:
it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
it->second.message_pipe = NULL;
should_send_remove_message =
(remote_id != MessageInTransit::kInvalidEndpointId);
break;
case EndpointInfo::STATE_WAIT_LOCAL_DETACH:
local_id_to_endpoint_info_map_.erase(it);
break;
case EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK:
NOTREACHED();
break;
case EndpointInfo::STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK:
it->second.state = EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK;
break;
}
}
if (!should_send_remove_message)
return;

if (!SendControlMessage(
MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint,
local_id, remote_id)) {
HandleLocalError(base::StringPrintf(
"Failed to send message to remove remote message pipe endpoint (local "
"ID %u, remote ID %u)",
static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
}
}

Channel::~Channel() {
// The channel should have been shut down first.
DCHECK(!raw_channel_.get());

DLOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
<< "Destroying Channel with " << local_id_to_endpoint_info_map_.size()
<< " endpoints still present";
}

void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
Expand Down Expand Up @@ -249,6 +327,13 @@ void Channel::OnReadMessageForDownstream(
endpoint_info = it->second;
}

// Ignore messages for zombie endpoints (not an error).
if (endpoint_info.state != EndpointInfo::STATE_NORMAL) {
DVLOG(2) << "Ignoring downstream message for zombie endpoint (local ID = "
<< local_id << ", remote ID = " << message_view.source_id() << ")";
return;
}

// We need to duplicate the message, because |EnqueueMessage()| will take
// ownership of it.
scoped_ptr<MessageInTransit> message(new MessageInTransit(message_view));
Expand All @@ -261,7 +346,7 @@ void Channel::OnReadMessageForDownstream(
// error, e.g., if the remote side is sending invalid control messages (to
// the message pipe).
HandleLocalError(base::StringPrintf(
"Failed to enqueue message to local destination ID %u (result %d)",
"Failed to enqueue message to local ID %u (result %d)",
static_cast<unsigned>(local_id), static_cast<int>(result)));
return;
}
Expand All @@ -273,14 +358,35 @@ void Channel::OnReadMessageForChannel(

switch (message_view.subtype()) {
case MessageInTransit::kSubtypeChannelRunMessagePipeEndpoint:
// TODO(vtl): FIXME -- Error handling (also validation of
// source/destination IDs).
DVLOG(2) << "Handling channel message to run message pipe (local ID = "
<< message_view.destination_id() << ", remote ID = "
DVLOG(2) << "Handling channel message to run message pipe (local ID "
<< message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
if (!RunMessagePipeEndpoint(message_view.destination_id(),
message_view.source_id()))
HandleRemoteError("Received invalid channel run message pipe message");
message_view.source_id())) {
HandleRemoteError(
"Received invalid channel message to run message pipe");
}
break;
case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpoint:
DVLOG(2) << "Handling channel message to remove message pipe (local ID "
<< message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
message_view.source_id())) {
HandleRemoteError(
"Received invalid channel message to remove message pipe");
}
break;
case MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck:
DVLOG(2) << "Handling channel message to ack remove message pipe (local "
"ID "
<< message_view.destination_id() << ", remote ID "
<< message_view.source_id() << ")";
if (!RemoveMessagePipeEndpoint(message_view.destination_id(),
message_view.source_id())) {
HandleRemoteError(
"Received invalid channel message to ack remove message pipe");
}
break;
default:
HandleRemoteError("Received invalid channel message");
Expand All @@ -289,6 +395,62 @@ void Channel::OnReadMessageForChannel(
}
}

bool Channel::RemoveMessagePipeEndpoint(
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
EndpointInfo endpoint_info;
{
base::AutoLock locker(lock_);

IdToEndpointInfoMap::iterator it =
local_id_to_endpoint_info_map_.find(local_id);
if (it == local_id_to_endpoint_info_map_.end()) {
DVLOG(2) << "Remove message pipe error: not found";
return false;
}

// If it's waiting for the remove ack, just do it and return.
if (it->second.state == EndpointInfo::STATE_WAIT_REMOTE_REMOVE_ACK) {
local_id_to_endpoint_info_map_.erase(it);
return true;
}

if (it->second.state != EndpointInfo::STATE_NORMAL) {
DVLOG(2) << "Remove message pipe error: wrong state";
return false;
}

it->second.state = EndpointInfo::STATE_WAIT_LOCAL_DETACH;
endpoint_info = it->second;
it->second.message_pipe = NULL;
}

if (!SendControlMessage(
MessageInTransit::kSubtypeChannelRemoveMessagePipeEndpointAck,
local_id, remote_id)) {
HandleLocalError(base::StringPrintf(
"Failed to send message to remove remote message pipe endpoint ack "
"(local ID %u, remote ID %u)",
static_cast<unsigned>(local_id), static_cast<unsigned>(remote_id)));
}

endpoint_info.message_pipe->OnRemove(endpoint_info.port);

return true;
}

bool Channel::SendControlMessage(MessageInTransit::Subtype subtype,
MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
DVLOG(2) << "Sending channel control message: subtype " << subtype
<< ", local ID " << local_id << ", remote ID " << remote_id;
scoped_ptr<MessageInTransit> message(new MessageInTransit(
MessageInTransit::kTypeChannel, subtype, 0, 0, NULL));
message->set_source_id(local_id);
message->set_destination_id(remote_id);
return WriteMessage(message.Pass());
}

void Channel::HandleRemoteError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this? Probably we want to
// terminate the connection, since it's spewing invalid stuff.
Expand All @@ -297,6 +459,10 @@ void Channel::HandleRemoteError(const base::StringPiece& error_message) {

void Channel::HandleLocalError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this?
// Sometimes we'll want to propagate the error back to the message pipe
// (endpoint), and notify it that the remote is (effectively) closed.
// Sometimes we'll want to kill the channel (and notify all the endpoints that
// their remotes are dead.
LOG(WARNING) << error_message;
}

Expand Down
Loading

0 comments on commit 3e8f1f0

Please sign in to comment.