Skip to content

Commit

Permalink
Mojo: Introduce [NoInterrupt] attribute
Browse files Browse the repository at this point in the history
GpuChannelHost uses its own custom sync waiting logic for the few sync
IPCs it sends. The key (and intentional) difference is that -- unlike
legacy and Mojo sync IPC -- it doesn't allow for wake-ups to dispatch
other incoming sync messages on the waiting thread.

This adds mojom support for a [NoInterrupt] attribute on [Sync] messages
to convey that such messages should employ the same behavior,
effectively only waking up for the caller's expected reply, or for other
sync messages *on the same pipe only* (which in practice is much easier
to control for, generally unlikely to be used, and not relevant to the
GPU case.)

Bug: 1196476
Change-Id: Ib14197cd7107d3c9388124d436e0548a350b599c
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2815833
Reviewed-by: Oksana Zhuravlova <oksamyt@chromium.org>
Commit-Queue: Ken Rockot <rockot@google.com>
Cr-Commit-Position: refs/heads/master@{#874450}
  • Loading branch information
krockot authored and Chromium LUCI CQ committed Apr 20, 2021
1 parent 471fa33 commit 5427297
Show file tree
Hide file tree
Showing 13 changed files with 443 additions and 28 deletions.
8 changes: 6 additions & 2 deletions ipc/ipc_mojo_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,16 +570,20 @@ class ChannelAssociatedGroupController
sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
}

bool SyncWatch(const bool* should_stop) override {
bool SyncWatch(mojo::SyncWatchMode mode, const bool& should_stop) override {
DCHECK(task_runner_->RunsTasksInCurrentSequence());

// We don't support [NoInterrupt] messages on Channel-associated
// interfaces.
DCHECK_EQ(mode, mojo::SyncWatchMode::kAllowInterrupt);

// It's not legal to make sync calls from the primary endpoint's thread,
// and in fact they must only happen from the proxy task runner.
DCHECK(!controller_->task_runner_->BelongsToCurrentThread());
DCHECK(controller_->proxy_task_runner_->BelongsToCurrentThread());

EnsureSyncWatcherExists();
return sync_watcher_->SyncWatch(should_stop);
return sync_watcher_->SyncWatch(&should_stop);
}

void RegisterExternalSyncWaiter(uint64_t request_id) override {}
Expand Down
18 changes: 17 additions & 1 deletion mojo/public/cpp/bindings/interface_endpoint_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ namespace mojo {

class Message;

// Indicates how a SyncWatch call should behave.
enum class SyncWatchMode {
// Other sync events are allowed to dispatch during this sync wait. For
// example if an incoming sync IPC targets some other receiver bound on the
// waiting thread, we'll allow that message to dispatch before we return to
// waiting. This is the safer and preferred behavior, and the default for all
// [Sync] messages.
kAllowInterrupt,

// The wait will only wake up once its waiting condition is met, and no other
// messages (sync or async) will be dispatched on the waiting thread until
// that happens and control is returned to the caller. While this is sometimes
// desirable, it is naturally more prone to deadlocks than `kAllowInterrupt`.
kNoInterrupt,
};

// A control interface exposed by AssociatedGroupController for interface
// endpoints.
class InterfaceEndpointController {
Expand All @@ -31,7 +47,7 @@ class InterfaceEndpointController {
// - return false otherwise, including
// MultiplexRouter::DetachEndpointClient() being called for the same
// interface endpoint.
virtual bool SyncWatch(const bool* should_stop) = 0;
virtual bool SyncWatch(SyncWatchMode mode, const bool& should_stop) = 0;

// Notifies the controller that a specific in-flight sync message identified
// by `request_id` has an off-thread sync waiter, so its reply must be
Expand Down
24 changes: 19 additions & 5 deletions mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ void ThreadSafeInterfaceEndpointClientProxy::SendMessageWithResponder(
}

// If the Remote is bound on another sequence, post the call.
const bool allow_interrupt = !message.has_flag(Message::kFlagNoInterrupt);
auto response = base::MakeRefCounted<SyncResponseInfo>();
auto response_signaler = std::make_unique<SyncResponseSignaler>(response);
task_runner_->PostTask(
Expand All @@ -384,9 +385,18 @@ void ThreadSafeInterfaceEndpointClientProxy::SendMessageWithResponder(
sync_calls->pending_responses.push_back(response.get());
}

SyncEventWatcher watcher(&response->event, base::DoNothing());
const bool* stop_flags[] = {&response->received, &response->cancelled};
watcher.SyncWatch(stop_flags, base::size(stop_flags));
if (allow_interrupt) {
// In the common case where interrupts are allowed, we watch cooperatively
// with other potential endpoints on the same thread.
SyncEventWatcher watcher(&response->event, base::DoNothing());
const bool* stop_flags[] = {&response->received, &response->cancelled};
watcher.SyncWatch(stop_flags, base::size(stop_flags));
} else {
// Else we can wait on the event directly. It will only signal after our
// reply has been processed or cancelled.
response->event.Wait();
DCHECK(response->received || response->cancelled);
}

{
base::AutoLock l(sync_calls->lock);
Expand Down Expand Up @@ -580,7 +590,11 @@ bool InterfaceEndpointClient::SendMessageWithResponder(
// message before calling |SendMessage()| below.
#endif

bool is_sync = message->has_flag(Message::kFlagIsSync);
const bool is_sync = message->has_flag(Message::kFlagIsSync);
const SyncWatchMode sync_watch_mode =
message->has_flag(Message::kFlagNoInterrupt)
? SyncWatchMode::kNoInterrupt
: SyncWatchMode::kAllowInterrupt;
if (!controller_->SendMessage(message))
return false;

Expand All @@ -606,7 +620,7 @@ bool InterfaceEndpointClient::SendMessageWithResponder(

base::WeakPtr<InterfaceEndpointClient> weak_self =
weak_ptr_factory_.GetWeakPtr();
controller_->SyncWatch(&response_received);
controller_->SyncWatch(sync_watch_mode, response_received);
// Make sure that this instance hasn't been destroyed.
if (weak_self) {
DCHECK(base::Contains(sync_responses_, request_id));
Expand Down
15 changes: 12 additions & 3 deletions mojo/public/cpp/bindings/lib/multiplex_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,20 @@ class MultiplexRouter::InterfaceEndpoint
sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence();
}

bool SyncWatch(const bool* should_stop) override {
bool SyncWatch(SyncWatchMode mode, const bool& should_stop) override {
DCHECK(task_runner_->RunsTasksInCurrentSequence());

EnsureSyncWatcherExists();
return sync_watcher_->SyncWatch(should_stop);
if (mode == SyncWatchMode::kAllowInterrupt) {
EnsureSyncWatcherExists();
return sync_watcher_->SyncWatch(&should_stop);
}

DCHECK_EQ(mode, SyncWatchMode::kNoInterrupt);
while (!should_stop) {
if (!router_->WaitForIncomingMessage())
return false;
}
return true;
}

void RegisterExternalSyncWaiter(uint64_t request_id) override {
Expand Down
1 change: 1 addition & 0 deletions mojo/public/cpp/bindings/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS_BASE) Message {
static const uint32_t kFlagExpectsResponse = 1 << 0;
static const uint32_t kFlagIsResponse = 1 << 1;
static const uint32_t kFlagIsSync = 1 << 2;
static const uint32_t kFlagNoInterrupt = 1 << 3;

// Constructs an uninitialized Message object.
Message();
Expand Down
1 change: 1 addition & 0 deletions mojo/public/cpp/bindings/tests/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ mojom("test_mojom") {
"remote_unittest.test-mojom",
"service_factory_unittest.test-mojom",
"struct_headers_unittest.test-mojom",
"sync_method_unittest.test-mojom",
]

public_deps = [
Expand Down
Loading

0 comments on commit 5427297

Please sign in to comment.