Skip to content

Commit

Permalink
Mojo: Kill endpoints on Channel Shutdown().
Browse files Browse the repository at this point in the history
That is:
* Don't require that endpoints be Close()d before Shutdown().
* On Shutdown(), it'll appear as if the remote endpoint has been closed.

R=darin@chromium.org
BUG=365176,360081

Review URL: https://codereview.chromium.org/244583003

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@265153 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
viettrungluu@chromium.org committed Apr 22, 2014
1 parent 4f18ed5 commit 0a82b04
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 51 deletions.
73 changes: 44 additions & 29 deletions mojo/system/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@

#include "mojo/system/channel.h"

#include <algorithm>

#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/strings/stringprintf.h"
#include "build/build_config.h" // TODO(vtl): Remove this.
#include "mojo/system/message_pipe_endpoint.h"

namespace mojo {
Expand Down Expand Up @@ -46,7 +49,7 @@ bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {

// No need to take |lock_|, since this must be called before this object
// becomes thread-safe.
DCHECK(!raw_channel_);
DCHECK(!is_running_no_lock());
raw_channel_ = raw_channel.Pass();

if (!raw_channel_->Init(this)) {
Expand All @@ -60,32 +63,35 @@ bool Channel::Init(scoped_ptr<RawChannel> raw_channel) {
void Channel::Shutdown() {
DCHECK(creation_thread_checker_.CalledOnValidThread());

base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
raw_channel_->Shutdown();
raw_channel_.reset();

// This shouldn't usually occur, but it should be okay if all the endpoints
// are zombies (i.e., waiting to be removed, and not actually having any
// references to |MessagePipe|s).
// TODO(vtl): To make this actually okay, we need to make sure the other side
// channels being killed off properly.
LOG_IF(WARNING, !local_id_to_endpoint_info_map_.empty())
<< "Channel shutting down with endpoints still attached "
"(hopefully all zombies)";

#ifndef NDEBUG
// Check that everything left is a zombie. (Note: We don't explicitly clear
// |local_id_to_endpoint_info_map_|, since that would likely put us in an
// inconsistent state if we have non-zombies.)
for (IdToEndpointInfoMap::const_iterator it =
IdToEndpointInfoMap local_id_to_endpoint_info_map;
{
base::AutoLock locker(lock_);
if (!is_running_no_lock())
return;

raw_channel_->Shutdown();
raw_channel_.reset();

// We need to deal with it outside the lock.
std::swap(local_id_to_endpoint_info_map, local_id_to_endpoint_info_map_);
}

size_t num_live = 0;
size_t num_zombies = 0;
for (IdToEndpointInfoMap::iterator it =
local_id_to_endpoint_info_map_.begin();
it != local_id_to_endpoint_info_map_.end();
++it) {
DCHECK_NE(it->second.state, EndpointInfo::STATE_NORMAL);
DCHECK(!it->second.message_pipe.get());
if (it->second.state == EndpointInfo::STATE_NORMAL) {
it->second.message_pipe->OnRemove(it->second.port);
num_live++;
} else {
DCHECK(!it->second.message_pipe.get());
num_zombies++;
}
}
#endif
DVLOG(2) << "Shut down Channel with " << num_live << " live endpoints and "
<< num_zombies << " zombies";
}

MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
Expand Down Expand Up @@ -190,7 +196,7 @@ void Channel::RunRemoteMessagePipeEndpoint(

bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {
base::AutoLock locker(lock_);
if (!raw_channel_.get()) {
if (!is_running_no_lock()) {
// TODO(vtl): I think this is probably not an error condition, but I should
// think about it (and the shutdown sequence) more carefully.
LOG(WARNING) << "WriteMessage() after shutdown";
Expand All @@ -202,7 +208,8 @@ bool Channel::WriteMessage(scoped_ptr<MessageInTransit> message) {

bool Channel::IsWriteBufferEmpty() {
base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
if (!is_running_no_lock())
return true;
return raw_channel_->IsWriteBufferEmpty();
}

Expand All @@ -214,6 +221,9 @@ void Channel::DetachMessagePipeEndpoint(
bool should_send_remove_message = false;
{
base::AutoLock locker_(lock_);
if (!is_running_no_lock())
return;

IdToEndpointInfoMap::iterator it =
local_id_to_endpoint_info_map_.find(local_id);
DCHECK(it != local_id_to_endpoint_info_map_.end());
Expand Down Expand Up @@ -251,7 +261,7 @@ void Channel::DetachMessagePipeEndpoint(

Channel::~Channel() {
// The channel should have been shut down first.
DCHECK(!raw_channel_.get());
DCHECK(!is_running_no_lock());
}

void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
Expand All @@ -276,8 +286,13 @@ void Channel::OnReadMessage(const MessageInTransit::View& message_view) {
}

void Channel::OnFatalError(FatalError fatal_error) {
// TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
NOTIMPLEMENTED();
LOG(ERROR) << "RawChannel fatal error (type " << fatal_error << ")";
// TODO(vtl): We have some nested-deletion bugs on Windows, so this crashes.
#if defined(OS_WIN)
LOG(ERROR) << "Not shutting down due Windows-only bug";
#else
Shutdown();
#endif
}

bool Channel::ValidateReadMessage(const MessageInTransit::View& message_view) {
Expand Down Expand Up @@ -309,7 +324,7 @@ void Channel::OnReadMessageForDownstream(
// Since we own |raw_channel_|, and this method and |Shutdown()| should only
// be called from the creation thread, |raw_channel_| should never be null
// here.
DCHECK(raw_channel_.get());
DCHECK(is_running_no_lock());

IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
Expand Down
46 changes: 24 additions & 22 deletions mojo/system/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,29 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
MessageInTransit::EndpointId remote_id);

private:
struct EndpointInfo {
enum State {
// Attached, possibly running or not.
STATE_NORMAL,
// "Zombie" states:
// Waiting for |DetachMessagePipeEndpoint()| before removing.
STATE_WAIT_LOCAL_DETACH,
// Waiting for a |kSubtypeChannelRemoveMessagePipeEndpointAck| before
// removing.
STATE_WAIT_REMOTE_REMOVE_ACK,
// Waiting for both of the above conditions before removing.
STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK,
};

EndpointInfo();
EndpointInfo(scoped_refptr<MessagePipe> message_pipe, unsigned port);
~EndpointInfo();

State state;
scoped_refptr<MessagePipe> message_pipe;
unsigned port;
};

friend class base::RefCountedThreadSafe<Channel>;
virtual ~Channel();

Expand Down Expand Up @@ -142,28 +165,7 @@ class MOJO_SYSTEM_IMPL_EXPORT Channel
MessageInTransit::EndpointId source_id,
MessageInTransit::EndpointId destination_id);

struct EndpointInfo {
enum State {
// Attached, possibly running or not.
STATE_NORMAL,
// "Zombie" states:
// Waiting for |DetachMessagePipeEndpoint()| before removing.
STATE_WAIT_LOCAL_DETACH,
// Waiting for a |kSubtypeChannelRemoveMessagePipeEndpointAck| before
// removing.
STATE_WAIT_REMOTE_REMOVE_ACK,
// Waiting for both of the above conditions before removing.
STATE_WAIT_LOCAL_DETACH_AND_REMOTE_REMOVE_ACK,
};

EndpointInfo();
EndpointInfo(scoped_refptr<MessagePipe> message_pipe, unsigned port);
~EndpointInfo();

State state;
scoped_refptr<MessagePipe> message_pipe;
unsigned port;
};
bool is_running_no_lock() const { return raw_channel_; }

base::ThreadChecker creation_thread_checker_;

Expand Down

0 comments on commit 0a82b04

Please sign in to comment.