Skip to content

Commit

Permalink
Defer IPC Channel receiving until safe
Browse files Browse the repository at this point in the history
For IPC Channels created directly on the main thread it's possible to
start IO-thread receiving too early. The net result is that very rarely
the initial SetPeerPid message may get dropped and stall the channel.

The fix is to wait until the internal Channel interface endpoints
are attached to the pipe before we start reading any messages from it.

Bug: 1216193
Change-Id: I8c33a7d171ae673cd7fdb06e4f7bc6ac48028c6f
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2941102
Commit-Queue: Ken Rockot <rockot@google.com>
Reviewed-by: Tom Sepez <tsepez@chromium.org>
Cr-Commit-Position: refs/heads/master@{#889480}
  • Loading branch information
krockot authored and Chromium LUCI CQ committed Jun 4, 2021
1 parent a36c4bc commit 9fae8bc
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 31 deletions.
1 change: 1 addition & 0 deletions ipc/ipc_channel_mojo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ bool ChannelMojo::Connect() {
void ChannelMojo::FinishConnectOnIOThread() {
DCHECK(message_reader_);
message_reader_->FinishInitializationOnIOThread(GetSelfPID());
bootstrap_->StartReceiving();
}

void ChannelMojo::Pause() {
Expand Down
55 changes: 27 additions & 28 deletions ipc/ipc_mojo_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,29 +173,6 @@ class ChannelAssociatedGroupController
*count = top_message_info_and_count.second;
}

void Bind(mojo::ScopedMessagePipeHandle handle) {
connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
"IPC Channel");
connector_->set_incoming_receiver(&dispatcher_);
connector_->set_connection_error_handler(
base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
base::Unretained(this)));
connector_->set_enforce_errors_from_incoming_receiver(false);
if (quota_checker_)
connector_->SetMessageQuotaChecker(quota_checker_);

// Don't let the Connector do any sort of queuing on our behalf. Individual
// messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
// of messages received by this Connector) are already individually
// scheduled for dispatch by ChannelProxy, so Connector's normal mode of
// operation would only introduce a redundant scheduling step for most
// messages.
connector_->set_force_immediate_dispatch(true);

connector_->StartReceiving(task_runner_);
}

void Pause() {
DCHECK(!paused_);
paused_ = true;
Expand All @@ -219,9 +196,28 @@ class ChannelAssociatedGroupController
SendMessage(&message);
}

void CreateChannelEndpoints(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
void Bind(mojo::ScopedMessagePipeHandle handle,
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) {
connector_ = std::make_unique<mojo::Connector>(
std::move(handle), mojo::Connector::SINGLE_THREADED_SEND,
"IPC Channel");
connector_->set_incoming_receiver(&dispatcher_);
connector_->set_connection_error_handler(
base::BindOnce(&ChannelAssociatedGroupController::OnPipeError,
base::Unretained(this)));
connector_->set_enforce_errors_from_incoming_receiver(false);
if (quota_checker_)
connector_->SetMessageQuotaChecker(quota_checker_);

// Don't let the Connector do any sort of queuing on our behalf. Individual
// messages bound for the IPC::ChannelProxy thread (i.e. that vast majority
// of messages received by this Connector) are already individually
// scheduled for dispatch by ChannelProxy, so Connector's normal mode of
// operation would only introduce a redundant scheduling step for most
// messages.
connector_->set_force_immediate_dispatch(true);

mojo::InterfaceId sender_id, receiver_id;
if (set_interface_id_namespace_bit_) {
sender_id = 1 | mojo::kInterfaceIdNamespaceMask;
Expand Down Expand Up @@ -252,6 +248,8 @@ class ChannelAssociatedGroupController
std::move(receiver_handle));
}

void StartReceiving() { connector_->StartReceiving(task_runner_); }

void ShutDown() {
DCHECK(thread_checker_.CalledOnValidThread());
shut_down_ = true;
Expand Down Expand Up @@ -1147,10 +1145,11 @@ class MojoBootstrapImpl : public MojoBootstrap {
void Connect(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) override {
controller_->Bind(std::move(handle_));
controller_->CreateChannelEndpoints(sender, receiver);
controller_->Bind(std::move(handle_), sender, receiver);
}

void StartReceiving() override { controller_->StartReceiving(); }

void Pause() override {
controller_->Pause();
}
Expand Down
8 changes: 7 additions & 1 deletion ipc/ipc_mojo_bootstrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,17 @@ class COMPONENT_EXPORT(IPC) MojoBootstrap {
const scoped_refptr<base::SingleThreadTaskRunner>& proxy_task_runner,
const scoped_refptr<mojo::internal::MessageQuotaChecker>& quota_checker);

// Start the handshake over the underlying message pipe.
// Initialize the Channel pipe and interface endpoints. This performs all
// setup except actually starting to read messages off the pipe.
virtual void Connect(
mojo::PendingAssociatedRemote<mojom::Channel>* sender,
mojo::PendingAssociatedReceiver<mojom::Channel>* receiver) = 0;

// Enable incoming messages to start being read off the pipe and routed to
// endpoints. Must not be called until the pending endpoints created by
// Connect() are actually bound somewhere.
virtual void StartReceiving() = 0;

// Stop transmitting messages and start queueing them instead.
virtual void Pause() = 0;

Expand Down
5 changes: 5 additions & 0 deletions ipc/ipc_mojo_bootstrap_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class Connection {
bootstrap_->Connect(&sender, &receiver_);
sender_.Bind(std::move(sender));
sender_->SetPeerPid(sender_id);

// It's OK to start receiving right away even though `receiver_` isn't
// bound, because all of these tests are single-threaded and it will be
// bound before any incoming messages can be scheduled for processing.
bootstrap_->StartReceiving();
}

void TakeReceiver(
Expand Down
5 changes: 3 additions & 2 deletions mojo/public/cpp/bindings/lib/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ void Connector::StartReceiving(
DCHECK(!task_runner_);
task_runner_ = std::move(task_runner);
allow_woken_up_by_others_ = allow_woken_up_by_others;

DETACH_FROM_SEQUENCE(sequence_checker_);
if (task_runner_->RunsTasksInCurrentSequence()) {
WaitToReadMore();
} else {
DETACH_FROM_SEQUENCE(sequence_checker_);
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&Connector::WaitToReadMore, weak_factory_.GetWeakPtr()));
Expand Down Expand Up @@ -313,7 +314,7 @@ bool Connector::PrefersSerializedMessages() {
}

bool Connector::Accept(Message* message) {
if (!lock_)
if (!lock_ && task_runner_)
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);

if (error_)
Expand Down

0 comments on commit 9fae8bc

Please sign in to comment.