From 1b1ee65f84ea20408262907ed4ba107836807a4a Mon Sep 17 00:00:00 2001 From: Ken Rockot Date: Thu, 20 May 2021 15:26:23 +0000 Subject: [PATCH] GPU: Introduce CommandBuffer Mojo interface This adds CommandBuffer as a new Mojo interface associated with its owning GpuChannel. There's also a corresponding CommandBufferClient associated and running in the opposite direction. Since there's a good bit of new infrastructure involved to ensure behavioral parity with the current message dispatch, this CL only migrates one message to each new interface for now. To replicate the existing scheduling behavior of messages moved to the CommandBuffer interface, a new gpu::SchedulerTaskRunner is introduced to expose a base::SequencedTaskRunner interface for a specific sequence on the gpu::Scheduler. The CommandBuffer's Mojo receiver binds using this task runner. This also requires loosening some sequence safety checks inside Mojo bindings. To wit, we want all received messages to be dispatched by a SchedulerTaskRunner, but we want to set up and tear down the receiver from tasks that are NOT run by that SchedulerTaskRunner. Since the base sequencing APIs (and in turn Mojo bindings' internal safety checks) aren't very well suited to this kind of arrangement, we have to pull a few strings: endpoints may now be bound to a sequence other than the one doing the binding (this turns out to be trivially safe to allow), and they can also be torn down from a sequence other than the one to which they're bound, provided the caller knows what they're doing and uses the dubious but aptly foreboding ResetFromAnotherSequenceUnsafe(). Bug: 1196476 Change-Id: Ie2b2cd775c271f0e9a105949e9df65b88f21ffa2 Binary-Size: New usage of associated endpoints introduces a lot of new generated code. https://crbug.com/1208544 Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/2863930 Commit-Queue: Ken Rockot Reviewed-by: Sunny Sachanandani Reviewed-by: Tom Sepez Reviewed-by: Daniel Cheng Cr-Commit-Position: refs/heads/master@{#885034} --- gpu/command_buffer/service/BUILD.gn | 2 + .../service/scheduler_task_runner.cc | 82 +++++++++++++++++ .../service/scheduler_task_runner.h | 52 +++++++++++ gpu/ipc/client/command_buffer_proxy_impl.cc | 20 ++-- gpu/ipc/client/command_buffer_proxy_impl.h | 14 ++- .../command_buffer_proxy_impl_unittest.cc | 17 +++- gpu/ipc/client/gpu_channel_host.h | 4 + gpu/ipc/common/BUILD.gn | 2 + gpu/ipc/common/gpu_channel.mojom | 22 ++++- gpu/ipc/common/gpu_messages.h | 12 --- gpu/ipc/common/mock_command_buffer.cc | 18 ++++ gpu/ipc/common/mock_command_buffer.h | 30 ++++++ gpu/ipc/common/mock_gpu_channel.h | 8 +- gpu/ipc/service/command_buffer_stub.cc | 76 ++++++++++------ gpu/ipc/service/command_buffer_stub.h | 27 +++++- gpu/ipc/service/gpu_channel.cc | 20 +++- gpu/ipc/service/gpu_channel.h | 2 + gpu/ipc/service/gpu_channel_test_common.cc | 6 ++ .../image_decode_accelerator_stub_unittest.cc | 5 +- ipc/ipc_mojo_bootstrap.cc | 91 +++++++++++++++---- ipc/ipc_mojo_bootstrap.h | 60 ++++++++++++ .../public/cpp/bindings/associated_receiver.h | 6 ++ .../cpp/bindings/interface_endpoint_client.h | 9 ++ .../bindings/lib/interface_endpoint_client.cc | 16 +++- .../cpp/bindings/lib/multiplex_router.cc | 2 - .../cpp/bindings/shared_associated_remote.h | 32 +++++-- 26 files changed, 542 insertions(+), 93 deletions(-) create mode 100644 gpu/command_buffer/service/scheduler_task_runner.cc create mode 100644 gpu/command_buffer/service/scheduler_task_runner.h create mode 100644 gpu/ipc/common/mock_command_buffer.cc create mode 100644 gpu/ipc/common/mock_command_buffer.h diff --git a/gpu/command_buffer/service/BUILD.gn b/gpu/command_buffer/service/BUILD.gn index 675ffb9718ab36..4b23eb0e05511a 100644 --- a/gpu/command_buffer/service/BUILD.gn +++ b/gpu/command_buffer/service/BUILD.gn @@ -54,6 +54,8 @@ target(link_target_type, "service_sources") { "memory_tracking.h", "scheduler.cc", "scheduler.h", + "scheduler_task_runner.cc", + "scheduler_task_runner.h", "sequence_id.h", "sync_point_manager.cc", "sync_point_manager.h", diff --git a/gpu/command_buffer/service/scheduler_task_runner.cc b/gpu/command_buffer/service/scheduler_task_runner.cc new file mode 100644 index 00000000000000..5312e858ae053b --- /dev/null +++ b/gpu/command_buffer/service/scheduler_task_runner.cc @@ -0,0 +1,82 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "gpu/command_buffer/service/scheduler_task_runner.h" + +#include +#include + +#include "base/bind.h" +#include "base/check.h" +#include "base/no_destructor.h" +#include "base/threading/thread_local.h" +#include "gpu/command_buffer/common/sync_token.h" +#include "gpu/command_buffer/service/scheduler.h" + +namespace gpu { + +namespace { + +base::ThreadLocalPointer& +GetCurrentTaskRunnerStorage() { + static base::NoDestructor> + runner; + return *runner; +} + +void SetCurrentTaskRunner(const SchedulerTaskRunner* runner) { + GetCurrentTaskRunnerStorage().Set(runner); +} + +const SchedulerTaskRunner* GetCurrentTaskRunner() { + return GetCurrentTaskRunnerStorage().Get(); +} + +} // namespace + +SchedulerTaskRunner::SchedulerTaskRunner(Scheduler& scheduler, + SequenceId sequence_id) + : scheduler_(scheduler), sequence_id_(sequence_id) {} + +SchedulerTaskRunner::~SchedulerTaskRunner() = default; + +void SchedulerTaskRunner::ShutDown() { + is_running_ = false; +} + +bool SchedulerTaskRunner::PostDelayedTask(const base::Location& from_here, + base::OnceClosure task, + base::TimeDelta delay) { + return PostNonNestableDelayedTask(from_here, std::move(task), delay); +} + +bool SchedulerTaskRunner::PostNonNestableDelayedTask( + const base::Location& from_here, + base::OnceClosure task, + base::TimeDelta delay) { + if (!is_running_) + return false; + + CHECK(delay.is_zero()); + scheduler_.ScheduleTask(Scheduler::Task( + sequence_id_, + base::BindOnce(&SchedulerTaskRunner::RunTask, this, std::move(task)), + std::vector())); + return true; +} + +bool SchedulerTaskRunner::RunsTasksInCurrentSequence() const { + const SchedulerTaskRunner* current = GetCurrentTaskRunner(); + return current != nullptr && current->sequence_id_ == sequence_id_; +} + +void SchedulerTaskRunner::RunTask(base::OnceClosure task) { + // Scheduler doesn't nest tasks, so we don't support nesting. + DCHECK(!GetCurrentTaskRunner()); + SetCurrentTaskRunner(this); + std::move(task).Run(); + SetCurrentTaskRunner(nullptr); +} + +} // namespace gpu diff --git a/gpu/command_buffer/service/scheduler_task_runner.h b/gpu/command_buffer/service/scheduler_task_runner.h new file mode 100644 index 00000000000000..c9492ac38abb9d --- /dev/null +++ b/gpu/command_buffer/service/scheduler_task_runner.h @@ -0,0 +1,52 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef GPU_COMMAND_BUFFER_SERVICE_SCHEDULER_TASK_RUNNER_H_ +#define GPU_COMMAND_BUFFER_SERVICE_SCHEDULER_TASK_RUNNER_H_ + +#include "base/sequenced_task_runner.h" +#include "gpu/command_buffer/service/sequence_id.h" +#include "gpu/gpu_export.h" + +namespace gpu { + +class Scheduler; + +// A SequencedTaskRunner implementation to abstract task execution for a +// specific SequenceId on the GPU Scheduler. This object does not support +// delayed tasks, because the underlying Scheduler implementation does not +// support scheduling delayed tasks. Also note that tasks run by this object do +// not support running nested RunLoops. +class GPU_EXPORT SchedulerTaskRunner : public base::SequencedTaskRunner { + public: + // Constructs a SchedulerTaskRunner that runs tasks on `scheduler`, on the + // sequence identified by `sequence_id`. This instance must not outlive + // `scheduler`. + SchedulerTaskRunner(Scheduler& scheduler, SequenceId sequence_id); + + // Once this is called, all subsequent tasks will be rejected. + void ShutDown(); + + // base::SequencedTaskRunner: + bool PostDelayedTask(const base::Location& from_here, + base::OnceClosure task, + base::TimeDelta delay) override; + bool PostNonNestableDelayedTask(const base::Location& from_here, + base::OnceClosure task, + base::TimeDelta delay) override; + bool RunsTasksInCurrentSequence() const override; + + private: + ~SchedulerTaskRunner() override; + + void RunTask(base::OnceClosure task); + + Scheduler& scheduler_; + const SequenceId sequence_id_; + bool is_running_ = true; +}; + +} // namespace gpu + +#endif // GPU_COMMAND_BUFFER_SERVICE_SCHEDULER_TASK_RUNNER_H_ diff --git a/gpu/ipc/client/command_buffer_proxy_impl.cc b/gpu/ipc/client/command_buffer_proxy_impl.cc index 4513dc9c82915f..61264c45f1c355 100644 --- a/gpu/ipc/client/command_buffer_proxy_impl.cc +++ b/gpu/ipc/client/command_buffer_proxy_impl.cc @@ -31,6 +31,7 @@ #include "gpu/ipc/common/gpu_channel.mojom.h" #include "gpu/ipc/common/gpu_messages.h" #include "gpu/ipc/common/gpu_param_traits.h" +#include "ipc/ipc_mojo_bootstrap.h" #include "mojo/public/cpp/bindings/sync_call_restrictions.h" #include "mojo/public/cpp/system/buffer.h" #include "mojo/public/cpp/system/platform_handle.h" @@ -119,15 +120,23 @@ ContextResult CommandBufferProxyImpl::Initialize( // TODO(piman): Make this asynchronous (http://crbug.com/125248). ContextResult result = ContextResult::kSuccess; mojo::SyncCallRestrictions::ScopedAllowSyncCall allow_sync; + IPC::ScopedAllowOffSequenceChannelAssociatedBindings allow_binding; bool sent = channel->GetGpuChannel().CreateCommandBuffer( - std::move(params), route_id_, std::move(region), &result, &capabilities_); + std::move(params), route_id_, std::move(region), + command_buffer_.BindNewEndpointAndPassReceiver(channel->io_task_runner()), + client_receiver_.BindNewEndpointAndPassRemote(callback_thread_), &result, + &capabilities_); if (!sent) { + command_buffer_.reset(); + client_receiver_.reset(); channel->RemoveRoute(route_id_); LOG(ERROR) << "ContextResult::kTransientFailure: " "Failed to send GpuControl.CreateCommandBuffer."; return ContextResult::kTransientFailure; } if (result != ContextResult::kSuccess) { + command_buffer_.reset(); + client_receiver_.reset(); DLOG(ERROR) << "Failure processing GpuControl.CreateCommandBuffer."; channel->RemoveRoute(route_id_); return result; @@ -142,7 +151,6 @@ bool CommandBufferProxyImpl::OnMessageReceived(const IPC::Message& message) { bool handled = true; IPC_BEGIN_MESSAGE_MAP(CommandBufferProxyImpl, message) IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_Destroyed, OnDestroyed); - IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_ConsoleMsg, OnConsoleMessage); IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_GpuSwitched, OnGpuSwitched); IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_SignalAck, OnSignalAck); IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_SwapBuffersCompleted, @@ -185,11 +193,9 @@ void CommandBufferProxyImpl::OnDestroyed(gpu::error::ContextLostReason reason, OnGpuAsyncMessageError(reason, error); } -void CommandBufferProxyImpl::OnConsoleMessage( - const GPUCommandBufferConsoleMessage& message) { +void CommandBufferProxyImpl::OnConsoleMessage(const std::string& message) { if (gpu_control_client_) - gpu_control_client_->OnGpuControlErrorMessage(message.message.c_str(), - message.id); + gpu_control_client_->OnGpuControlErrorMessage(message.c_str(), /*id=*/0); } void CommandBufferProxyImpl::OnGpuSwitched( @@ -359,7 +365,7 @@ void CommandBufferProxyImpl::SetGetBuffer(int32_t shm_id) { if (last_state_.error != gpu::error::kNoError) return; - Send(new GpuCommandBufferMsg_SetGetBuffer(route_id_, shm_id)); + command_buffer_->SetGetBuffer(shm_id); last_put_offset_ = -1; has_buffer_ = (shm_id > 0); } diff --git a/gpu/ipc/client/command_buffer_proxy_impl.h b/gpu/ipc/client/command_buffer_proxy_impl.h index 3a541f708a32ea..b027ccb3be9019 100644 --- a/gpu/ipc/client/command_buffer_proxy_impl.h +++ b/gpu/ipc/client/command_buffer_proxy_impl.h @@ -35,12 +35,14 @@ #include "gpu/command_buffer/common/gpu_memory_allocation.h" #include "gpu/command_buffer/common/scheduling_priority.h" #include "gpu/gpu_export.h" +#include "gpu/ipc/common/gpu_channel.mojom.h" #include "gpu/ipc/common/surface_handle.h" #include "ipc/ipc_listener.h" +#include "mojo/public/cpp/bindings/associated_receiver.h" +#include "mojo/public/cpp/bindings/shared_associated_remote.h" #include "ui/gfx/swap_result.h" #include "ui/gl/gpu_preference.h" -struct GPUCommandBufferConsoleMessage; class GURL; namespace base { @@ -67,7 +69,8 @@ class GpuMemoryBufferManager; // CommandBufferStub. class GPU_EXPORT CommandBufferProxyImpl : public gpu::CommandBuffer, public gpu::GpuControl, - public IPC::Listener { + public IPC::Listener, + public mojom::CommandBufferClient { public: class DeletionObserver { public: @@ -183,10 +186,12 @@ class GPU_EXPORT CommandBufferProxyImpl : public gpu::CommandBuffer, std::pair AllocateAndMapSharedMemory(size_t size); + // mojom::CommandBufferClient: + void OnConsoleMessage(const std::string& message) override; + // Message handlers: void OnDestroyed(gpu::error::ContextLostReason reason, gpu::error::Error error); - void OnConsoleMessage(const GPUCommandBufferConsoleMessage& message); void OnGpuSwitched(gl::GpuPreference active_gpu_heuristic); void OnSignalAck(uint32_t id, const CommandBuffer::State& state); void OnSwapBuffersCompleted(const SwapBuffersCompleteParams& params); @@ -271,6 +276,9 @@ class GPU_EXPORT CommandBufferProxyImpl : public gpu::CommandBuffer, int32_t last_put_offset_ = -1; bool has_buffer_ = false; + mojo::SharedAssociatedRemote command_buffer_; + mojo::AssociatedReceiver client_receiver_{this}; + // Next generated fence sync. uint64_t next_fence_sync_release_ = 1; diff --git a/gpu/ipc/client/command_buffer_proxy_impl_unittest.cc b/gpu/ipc/client/command_buffer_proxy_impl_unittest.cc index b89a407b742807..3d8dc4bf35f8ed 100644 --- a/gpu/ipc/client/command_buffer_proxy_impl_unittest.cc +++ b/gpu/ipc/client/command_buffer_proxy_impl_unittest.cc @@ -12,6 +12,7 @@ #include "gpu/command_buffer/common/context_creation_attribs.h" #include "gpu/ipc/client/gpu_channel_host.h" #include "gpu/ipc/common/gpu_channel.mojom.h" +#include "gpu/ipc/common/mock_command_buffer.h" #include "gpu/ipc/common/mock_gpu_channel.h" #include "gpu/ipc/common/surface_handle.h" #include "ipc/ipc_test_sink.h" @@ -83,7 +84,8 @@ class CommandBufferProxyImplTest : public testing::Test { base::RunLoop().RunUntilIdle(); } - std::unique_ptr CreateAndInitializeProxy() { + std::unique_ptr CreateAndInitializeProxy( + MockCommandBuffer* mock_command_buffer = nullptr) { auto proxy = std::make_unique( channel_, nullptr /* gpu_memory_buffer_manager */, 0 /* stream_id */, base::ThreadTaskRunnerHandle::Get()); @@ -91,12 +93,23 @@ class CommandBufferProxyImplTest : public testing::Test { // The Initialize() call below synchronously requests a new CommandBuffer // using the channel's GpuControl interface. Simulate success, since we're // not actually talking to the service in these tests. - EXPECT_CALL(mock_gpu_channel_, CreateCommandBuffer(_, _, _, _, _)) + EXPECT_CALL(mock_gpu_channel_, CreateCommandBuffer(_, _, _, _, _, _, _)) .Times(1) .WillOnce(Invoke( [&](mojom::CreateCommandBufferParamsPtr params, int32_t routing_id, base::UnsafeSharedMemoryRegion shared_state, + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote + client, ContextResult* result, Capabilities* capabilities) -> bool { + // There's no real GpuChannel pipe for this endpoint to use, so + // give it its own dedicated pipe for these tests. This allows the + // CommandBufferProxyImpl to make calls on its CommandBuffer + // endpoint, which will send them to `mock_command_buffer` if + // provided by the test. + receiver.EnableUnassociatedUsage(); + if (mock_command_buffer) + mock_command_buffer->Bind(std::move(receiver)); *result = ContextResult::kSuccess; return true; })); diff --git a/gpu/ipc/client/gpu_channel_host.h b/gpu/ipc/client/gpu_channel_host.h index d42478d3909d77..329f986e3153e9 100644 --- a/gpu/ipc/client/gpu_channel_host.h +++ b/gpu/ipc/client/gpu_channel_host.h @@ -78,6 +78,10 @@ class GPU_EXPORT GpuChannelHost int channel_id() const { return channel_id_; } + const scoped_refptr& io_task_runner() { + return io_thread_; + } + // Virtual for testing. virtual mojom::GpuChannel& GetGpuChannel(); diff --git a/gpu/ipc/common/BUILD.gn b/gpu/ipc/common/BUILD.gn index 84b67de2eaaa2c..4c75bd454e3934 100644 --- a/gpu/ipc/common/BUILD.gn +++ b/gpu/ipc/common/BUILD.gn @@ -740,6 +740,8 @@ source_set("mojom_traits") { source_set("test_support") { testonly = true sources = [ + "mock_command_buffer.cc", + "mock_command_buffer.h", "mock_gpu_channel.cc", "mock_gpu_channel.h", ] diff --git a/gpu/ipc/common/gpu_channel.mojom b/gpu/ipc/common/gpu_channel.mojom index 20f5e882f94b32..9805e7cbdd9bec 100644 --- a/gpu/ipc/common/gpu_channel.mojom +++ b/gpu/ipc/common/gpu_channel.mojom @@ -150,7 +150,9 @@ interface GpuChannel { // will create an offscreen backbuffer of dimensions `size`. [Sync, NoInterrupt] CreateCommandBuffer( CreateCommandBufferParams params, int32 routing_id, - mojo_base.mojom.UnsafeSharedMemoryRegion shared_state) + mojo_base.mojom.UnsafeSharedMemoryRegion shared_state, + pending_associated_receiver receiver, + pending_associated_remote client) => (ContextResult result, Capabilities capabilties); // The CommandBufferProxy sends this to the CommandBufferStub in its @@ -198,6 +200,24 @@ interface GpuChannel { => (CommandBufferState state); }; +// Interface used to issue commands to a specific CommandBuffer instance in the +// GPU process. +interface CommandBuffer { + // Sets the shared memory buffer to use for commands. The ID given here must + // correspond to one registered by a prior RegisterTransferBuffer IPC to the + // same CommandBuffer. + SetGetBuffer(int32 shm_id); +}; + +// Interface used by the GPU process to send the client messages from a specific +// CommandBuffer instance. +interface CommandBufferClient { + // Notifies the client about a console message emitted on behalf of the + // command buffer. These messages are intended to be exposed by + // developer-facing UI such as the DevTools console. + OnConsoleMessage(string message); +}; + // DeferredRequests are batched locally by clients and sent to the service only // when flushing the channel via GpuChannelHost's EnsureFlush or VerifyFlush. struct DeferredRequest { diff --git a/gpu/ipc/common/gpu_messages.h b/gpu/ipc/common/gpu_messages.h index 59474350772939..2b023cb1e49afb 100644 --- a/gpu/ipc/common/gpu_messages.h +++ b/gpu/ipc/common/gpu_messages.h @@ -58,11 +58,6 @@ #define IPC_MESSAGE_START GpuChannelMsgStart -IPC_STRUCT_BEGIN(GPUCommandBufferConsoleMessage) - IPC_STRUCT_MEMBER(int32_t, id) - IPC_STRUCT_MEMBER(std::string, message) -IPC_STRUCT_END() - IPC_STRUCT_BEGIN(GpuCommandBufferMsg_CreateImage_Params) IPC_STRUCT_MEMBER(int32_t, id) IPC_STRUCT_MEMBER(gfx::GpuMemoryBufferHandle, gpu_memory_buffer) @@ -139,13 +134,6 @@ IPC_MESSAGE_ROUTED1(GpuStreamTextureMsg_UpdateRotatedVisibleSize, // These are messages between a renderer process to the GPU process relating to // a single OpenGL context. -// Sets the shared memory buffer used for commands. -IPC_MESSAGE_ROUTED1(GpuCommandBufferMsg_SetGetBuffer, int32_t /* shm_id */) - -// Sent by the GPU process to display messages in the console. -IPC_MESSAGE_ROUTED1(GpuCommandBufferMsg_ConsoleMsg, - GPUCommandBufferConsoleMessage /* msg */) - // Sent by the GPU process to notify the renderer process of a GPU switch. IPC_MESSAGE_ROUTED1(GpuCommandBufferMsg_GpuSwitched, gl::GpuPreference /* active_gpu_heuristic */) diff --git a/gpu/ipc/common/mock_command_buffer.cc b/gpu/ipc/common/mock_command_buffer.cc new file mode 100644 index 00000000000000..9df19a7bc5f221 --- /dev/null +++ b/gpu/ipc/common/mock_command_buffer.cc @@ -0,0 +1,18 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "gpu/ipc/common/mock_command_buffer.h" + +namespace gpu { + +MockCommandBuffer::MockCommandBuffer() = default; + +MockCommandBuffer::~MockCommandBuffer() = default; + +void MockCommandBuffer::Bind( + mojo::PendingAssociatedReceiver receiver) { + receiver_.Bind(std::move(receiver)); +} + +} // namespace gpu diff --git a/gpu/ipc/common/mock_command_buffer.h b/gpu/ipc/common/mock_command_buffer.h new file mode 100644 index 00000000000000..4b331fda284170 --- /dev/null +++ b/gpu/ipc/common/mock_command_buffer.h @@ -0,0 +1,30 @@ +// Copyright 2021 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef GPU_IPC_COMMON_MOCK_COMMAND_BUFFER_H_ +#define GPU_IPC_COMMON_MOCK_COMMAND_BUFFER_H_ + +#include "gpu/ipc/common/gpu_channel.mojom.h" +#include "mojo/public/cpp/bindings/associated_receiver.h" +#include "testing/gmock/include/gmock/gmock.h" + +namespace gpu { + +class MockCommandBuffer : public mojom::CommandBuffer { + public: + MockCommandBuffer(); + ~MockCommandBuffer() override; + + void Bind(mojo::PendingAssociatedReceiver receiver); + + // mojom::CommandBuffer: + MOCK_METHOD1(SetGetBuffer, void(int32_t)); + + private: + mojo::AssociatedReceiver receiver_{this}; +}; + +} // namespace gpu + +#endif // GPU_IPC_COMMON_MOCK_COMMAND_BUFFER_H_ diff --git a/gpu/ipc/common/mock_gpu_channel.h b/gpu/ipc/common/mock_gpu_channel.h index 1611427df77f29..7d0620b91a68f1 100644 --- a/gpu/ipc/common/mock_gpu_channel.h +++ b/gpu/ipc/common/mock_gpu_channel.h @@ -20,15 +20,19 @@ class MockGpuChannel : public mojom::GpuChannel { MOCK_METHOD0(TerminateForTesting, void()); MOCK_METHOD0(Flush, bool()); MOCK_METHOD1(Flush, void(FlushCallback)); - MOCK_METHOD4(CreateCommandBuffer, + MOCK_METHOD6(CreateCommandBuffer, void(mojom::CreateCommandBufferParamsPtr, int32_t, base::UnsafeSharedMemoryRegion, + mojo::PendingAssociatedReceiver, + mojo::PendingAssociatedRemote, CreateCommandBufferCallback)); - MOCK_METHOD5(CreateCommandBuffer, + MOCK_METHOD7(CreateCommandBuffer, bool(mojom::CreateCommandBufferParamsPtr, int32_t, base::UnsafeSharedMemoryRegion, + mojo::PendingAssociatedReceiver, + mojo::PendingAssociatedRemote, ContextResult*, Capabilities*)); MOCK_METHOD1(DestroyCommandBuffer, bool(int32_t)); diff --git a/gpu/ipc/service/command_buffer_stub.cc b/gpu/ipc/service/command_buffer_stub.cc index 67589b768be61b..9f56d20aab7fd3 100644 --- a/gpu/ipc/service/command_buffer_stub.cc +++ b/gpu/ipc/service/command_buffer_stub.cc @@ -28,6 +28,7 @@ #include "gpu/command_buffer/service/memory_tracking.h" #include "gpu/command_buffer/service/query_manager.h" #include "gpu/command_buffer/service/scheduler.h" +#include "gpu/command_buffer/service/scheduler_task_runner.h" #include "gpu/command_buffer/service/service_utils.h" #include "gpu/command_buffer/service/sync_point_manager.h" #include "gpu/config/gpu_crash_keys.h" @@ -37,6 +38,7 @@ #include "gpu/ipc/service/gpu_channel_manager_delegate.h" #include "gpu/ipc/service/gpu_watchdog_thread.h" #include "gpu/ipc/service/image_transport_surface.h" +#include "ipc/ipc_mojo_bootstrap.h" #include "ui/gl/gl_bindings.h" #include "ui/gl/gl_context.h" #include "ui/gl/gl_image.h" @@ -50,6 +52,7 @@ #endif namespace gpu { + struct WaitForCommandState { using Callback = CommandBufferStub::WaitForStateCallback; @@ -115,6 +118,9 @@ CommandBufferStub::CommandBufferStub( use_virtualized_gl_context_(false), command_buffer_id_(command_buffer_id), sequence_id_(sequence_id), + scheduler_task_runner_( + base::MakeRefCounted(*channel_->scheduler(), + sequence_id_)), stream_id_(stream_id), route_id_(route_id), last_flush_id_(0), @@ -187,7 +193,6 @@ bool CommandBufferStub::OnMessageReceived(const IPC::Message& message) { // handler can assume that the context is current (not necessary for // RetireSyncPoint or WaitSyncPoint). if (decoder_context_.get() && - message.type() != GpuCommandBufferMsg_SetGetBuffer::ID && message.type() != GpuCommandBufferMsg_RegisterTransferBuffer::ID && message.type() != GpuCommandBufferMsg_SignalSyncToken::ID && message.type() != GpuCommandBufferMsg_SignalQuery::ID) { @@ -201,7 +206,6 @@ bool CommandBufferStub::OnMessageReceived(const IPC::Message& message) { if (!handled) { handled = true; IPC_BEGIN_MESSAGE_MAP(CommandBufferStub, message) - IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_SetGetBuffer, OnSetGetBuffer); IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_RegisterTransferBuffer, OnRegisterTransferBuffer); IPC_MESSAGE_HANDLER(GpuCommandBufferMsg_SignalSyncToken, @@ -289,7 +293,7 @@ void CommandBufferStub::PerformWork() { bool CommandBufferStub::HasUnprocessedCommands() { if (command_buffer_) { - CommandBuffer::State state = command_buffer_->GetState(); + gpu::CommandBuffer::State state = command_buffer_->GetState(); return command_buffer_->put_offset() != state.get_offset && !error::IsError(state.error); } @@ -358,11 +362,11 @@ void CommandBufferStub::Destroy() { crash_keys::gpu_gl_context_is_virtual.Set(use_virtualized_gl_context_ ? "1" : "0"); if (wait_for_token_) { - std::move(wait_for_token_->callback).Run(CommandBuffer::State()); + std::move(wait_for_token_->callback).Run(gpu::CommandBuffer::State()); wait_for_token_.reset(); } if (wait_for_get_offset_) { - std::move(wait_for_get_offset_->callback).Run(CommandBuffer::State()); + std::move(wait_for_get_offset_->callback).Run(gpu::CommandBuffer::State()); wait_for_get_offset_.reset(); } @@ -413,12 +417,27 @@ void CommandBufferStub::Destroy() { } command_buffer_.reset(); + + scheduler_task_runner_->ShutDown(); + + // Note: `receiver_` runs tasks on `scheduler_task_runner_`, which is not the + // current task runner when this method runs. Hence we must use this unsafe + // reset to elide sequence safety checks. Its safety is guaranteed by the + // above ShutDown() call which ensures no further tasks will run on the + // sequence. + receiver_.ResetFromAnotherSequenceUnsafe(); + client_.reset(); } -void CommandBufferStub::OnSetGetBuffer(int32_t shm_id) { - TRACE_EVENT0("gpu", "CommandBufferStub::OnSetGetBuffer"); - if (command_buffer_) +void CommandBufferStub::SetGetBuffer(int32_t shm_id) { + TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("devtools.timeline"), "GPUTask", + "data", DevToolsChannelData::CreateForChannel(channel())); + UpdateActiveUrl(); + TRACE_EVENT0("gpu", "CommandBufferStub::SetGetBuffer"); + if (command_buffer_) { command_buffer_->SetGetBuffer(shm_id); + CheckCompleteWaits(); + } } CommandBufferServiceClient::CommandBatchProcessedResult @@ -433,7 +452,7 @@ CommandBufferStub::OnCommandBatchProcessed() { void CommandBufferStub::OnParseError() { TRACE_EVENT0("gpu", "CommandBufferStub::OnParseError"); DCHECK(command_buffer_.get()); - CommandBuffer::State state = command_buffer_->GetState(); + gpu::CommandBuffer::State state = command_buffer_->GetState(); IPC::Message* msg = new GpuCommandBufferMsg_Destroyed( route_id_, state.context_lost_reason, state.error); msg->set_unblock(true); @@ -493,10 +512,10 @@ void CommandBufferStub::WaitForGetOffsetInRange(uint32_t set_get_buffer_count, void CommandBufferStub::CheckCompleteWaits() { bool has_wait = wait_for_token_ || wait_for_get_offset_; if (has_wait) { - CommandBuffer::State state = command_buffer_->GetState(); + gpu::CommandBuffer::State state = command_buffer_->GetState(); if (wait_for_token_ && - (CommandBuffer::InRange(wait_for_token_->start, wait_for_token_->end, - state.token) || + (gpu::CommandBuffer::InRange(wait_for_token_->start, + wait_for_token_->end, state.token) || state.error != error::kNoError)) { ReportState(); std::move(wait_for_token_->callback).Run(state); @@ -504,9 +523,9 @@ void CommandBufferStub::CheckCompleteWaits() { } if (wait_for_get_offset_ && (((wait_set_get_buffer_count_ == state.set_get_buffer_count) && - CommandBuffer::InRange(wait_for_get_offset_->start, - wait_for_get_offset_->end, - state.get_offset)) || + gpu::CommandBuffer::InRange(wait_for_get_offset_->start, + wait_for_get_offset_->end, + state.get_offset)) || state.error != error::kNoError)) { ReportState(); std::move(wait_for_get_offset_->callback).Run(state); @@ -537,7 +556,7 @@ void CommandBufferStub::OnAsyncFlush( DCHECK(!sync_point_client_state_->Wait(sync_token, base::DoNothing())); last_flush_id_ = flush_id; - CommandBuffer::State pre_state = command_buffer_->GetState(); + gpu::CommandBuffer::State pre_state = command_buffer_->GetState(); UpdateActiveUrl(); { @@ -548,7 +567,7 @@ void CommandBufferStub::OnAsyncFlush( command_buffer_->Flush(put_offset, decoder_context_.get()); } - CommandBuffer::State post_state = command_buffer_->GetState(); + gpu::CommandBuffer::State post_state = command_buffer_->GetState(); if (pre_state.get_offset != post_state.get_offset) ReportState(); @@ -600,7 +619,7 @@ void CommandBufferStub::OnSignalSyncToken(const SyncToken& sync_token, } void CommandBufferStub::OnSignalAck(uint32_t id) { - CommandBuffer::State state = command_buffer_->GetState(); + gpu::CommandBuffer::State state = command_buffer_->GetState(); ReportState(); Send(new GpuCommandBufferMsg_SignalAck(route_id_, id, state)); } @@ -652,13 +671,7 @@ void CommandBufferStub::HandleReturnData(base::span data) { void CommandBufferStub::OnConsoleMessage(int32_t id, const std::string& message) { - GPUCommandBufferConsoleMessage console_message; - console_message.id = id; - console_message.message = message; - IPC::Message* msg = - new GpuCommandBufferMsg_ConsoleMsg(route_id_, console_message); - msg->set_unblock(true); - Send(msg); + client_->OnConsoleMessage(message); } void CommandBufferStub::CacheShader(const std::string& key, @@ -692,6 +705,17 @@ void CommandBufferStub::SetMemoryTrackerFactoryForTesting( SetOrGetMemoryTrackerFactory(factory); } +void CommandBufferStub::BindEndpoints( + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client) { + DCHECK(!receiver_); + DCHECK(!client_); + + IPC::ScopedAllowOffSequenceChannelAssociatedBindings allow_binding; + receiver_.Bind(std::move(receiver), scheduler_task_runner_); + client_.Bind(std::move(client)); +} + MemoryTracker* CommandBufferStub::GetMemoryTracker() const { return memory_tracker_.get(); } @@ -708,7 +732,7 @@ void CommandBufferStub::RegisterTransferBufferForTest( void CommandBufferStub::CheckContextLost() { DCHECK(command_buffer_); - CommandBuffer::State state = command_buffer_->GetState(); + gpu::CommandBuffer::State state = command_buffer_->GetState(); // Check the error reason and robustness extension to get a better idea if the // GL context was lost. We might try restarting the GPU process to recover diff --git a/gpu/ipc/service/command_buffer_stub.h b/gpu/ipc/service/command_buffer_stub.h index a355c61eeeefb2..e0f820e3b856ef 100644 --- a/gpu/ipc/service/command_buffer_stub.h +++ b/gpu/ipc/service/command_buffer_stub.h @@ -33,6 +33,8 @@ #include "gpu/ipc/service/gpu_ipc_service_export.h" #include "ipc/ipc_listener.h" #include "ipc/ipc_sender.h" +#include "mojo/public/cpp/bindings/associated_receiver.h" +#include "mojo/public/cpp/bindings/associated_remote.h" #include "ui/gfx/geometry/size.h" #include "ui/gfx/gpu_memory_buffer.h" #include "ui/gfx/swap_result.h" @@ -47,13 +49,23 @@ class MemoryTracker; struct SyncToken; struct WaitForCommandState; class GpuChannel; +class SchedulerTaskRunner; class SyncPointClientState; +// CommandBufferStub is a base class for different CommandBuffer backends +// (e.g. GLES2, Raster, WebGPU) within the GPU service. Each instance lives on +// the main thread and receives IPCs there, either dispatched to the default +// main thread TaskRunner, or a specific main-thread sequence on the GPU +// Scheduler. +// +// For every CommandBufferStub instance, there's a corresponding +// CommandBufferProxyImpl client. class GPU_IPC_SERVICE_EXPORT CommandBufferStub : public IPC::Listener, public IPC::Sender, public CommandBufferServiceClient, public DecoderClient, + public mojom::CommandBuffer, public base::SupportsWeakPtr { public: class DestructionObserver { @@ -84,6 +96,11 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub const mojom::CreateCommandBufferParams& params, base::UnsafeSharedMemoryRegion shared_state_shm) = 0; + // Establish Mojo bindings for the receiver and client endpoints. + void BindEndpoints( + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client); + MemoryTracker* GetMemoryTracker() const; virtual MemoryTracker* GetContextGroupMemoryTracker() const = 0; @@ -96,7 +113,7 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub // `callback` is invoked with the last known State once this occurs, or with // an invalid State if the CommandBuffer is destroyed first. using WaitForStateCallback = - base::OnceCallback; + base::OnceCallback; void WaitForTokenInRange(int32_t start, int32_t end, WaitForStateCallback callback); @@ -167,6 +184,9 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub scoped_refptr share_group() { return share_group_; } protected: + // mojom::CommandBuffer: + void SetGetBuffer(int32_t shm_id) override; + virtual bool HandleMessage(const IPC::Message& message) = 0; virtual void OnTakeFrontBuffer(const Mailbox& mailbox) {} virtual void OnReturnFrontBuffer(const Mailbox& mailbox, bool is_lost) {} @@ -209,6 +229,7 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub const CommandBufferId command_buffer_id_; const SequenceId sequence_id_; + const scoped_refptr scheduler_task_runner_; const int32_t stream_id_; const int32_t route_id_; @@ -220,7 +241,6 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub gles2::ProgramCache::ScopedCacheUse CreateCacheUse(); // Message handlers: - void OnSetGetBuffer(int32_t shm_id); void OnGetState(IPC::Message* reply_message); void OnAsyncFlush(int32_t put_offset, uint32_t flush_id, @@ -276,6 +296,9 @@ class GPU_IPC_SERVICE_EXPORT CommandBufferStub std::unique_ptr wait_for_get_offset_; uint32_t wait_set_get_buffer_count_; + mojo::AssociatedReceiver receiver_{this}; + mojo::AssociatedRemote client_; + DISALLOW_COPY_AND_ASSIGN(CommandBufferStub); }; diff --git a/gpu/ipc/service/gpu_channel.cc b/gpu/ipc/service/gpu_channel.cc index f3909c09b54e04..28786f5419e227 100644 --- a/gpu/ipc/service/gpu_channel.cc +++ b/gpu/ipc/service/gpu_channel.cc @@ -145,10 +145,13 @@ class GPU_IPC_SERVICE_EXPORT GpuChannelMessageFilter void CrashForTesting() override; void TerminateForTesting() override; void Flush(FlushCallback callback) override; - void CreateCommandBuffer(mojom::CreateCommandBufferParamsPtr config, - int32_t routing_id, - base::UnsafeSharedMemoryRegion shared_state, - CreateCommandBufferCallback callback) override; + void CreateCommandBuffer( + mojom::CreateCommandBufferParamsPtr config, + int32_t routing_id, + base::UnsafeSharedMemoryRegion shared_state, + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client, + CreateCommandBufferCallback callback) override; void DestroyCommandBuffer(int32_t routing_id, DestroyCommandBufferCallback callback) override; void ScheduleImageDecode(mojom::ScheduleImageDecodeParamsPtr params, @@ -411,6 +414,8 @@ void GpuChannelMessageFilter::CreateCommandBuffer( mojom::CreateCommandBufferParamsPtr params, int32_t routing_id, base::UnsafeSharedMemoryRegion shared_state, + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client, CreateCommandBufferCallback callback) { base::AutoLock auto_lock(gpu_channel_lock_); if (!gpu_channel_) { @@ -422,7 +427,8 @@ void GpuChannelMessageFilter::CreateCommandBuffer( FROM_HERE, base::BindOnce(&gpu::GpuChannel::CreateCommandBuffer, gpu_channel_->AsWeakPtr(), std::move(params), routing_id, - std::move(shared_state), + std::move(shared_state), std::move(receiver), + std::move(client), base::BindPostTask(base::ThreadTaskRunnerHandle::Get(), std::move(callback)))); } @@ -847,6 +853,8 @@ void GpuChannel::CreateCommandBuffer( mojom::CreateCommandBufferParamsPtr init_params, int32_t route_id, base::UnsafeSharedMemoryRegion shared_state_shm, + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client, mojom::GpuChannel::CreateCommandBufferCallback callback) { ScopedCreateCommandBufferResponder responder(std::move(callback)); TRACE_EVENT2("gpu", "GpuChannel::CreateCommandBuffer", "route_id", route_id, @@ -943,6 +951,8 @@ void GpuChannel::CreateCommandBuffer( return; } + stub->BindEndpoints(std::move(receiver), std::move(client)); + responder.set_result(ContextResult::kSuccess); responder.set_capabilities(stub->decoder_context()->GetCapabilities()); stubs_[route_id] = std::move(stub); diff --git a/gpu/ipc/service/gpu_channel.h b/gpu/ipc/service/gpu_channel.h index c821f92cec8649..002b64c720a6c3 100644 --- a/gpu/ipc/service/gpu_channel.h +++ b/gpu/ipc/service/gpu_channel.h @@ -192,6 +192,8 @@ class GPU_IPC_SERVICE_EXPORT GpuChannel : public IPC::Listener, mojom::CreateCommandBufferParamsPtr init_params, int32_t routing_id, base::UnsafeSharedMemoryRegion shared_state_shm, + mojo::PendingAssociatedReceiver receiver, + mojo::PendingAssociatedRemote client, mojom::GpuChannel::CreateCommandBufferCallback callback); void DestroyCommandBuffer(int32_t routing_id); bool CreateStreamTexture(int32_t stream_id); diff --git a/gpu/ipc/service/gpu_channel_test_common.cc b/gpu/ipc/service/gpu_channel_test_common.cc index 672863a0dd5747..e89c94734c61bf 100644 --- a/gpu/ipc/service/gpu_channel_test_common.cc +++ b/gpu/ipc/service/gpu_channel_test_common.cc @@ -20,6 +20,8 @@ #include "gpu/ipc/service/gpu_channel.h" #include "gpu/ipc/service/gpu_channel_manager.h" #include "gpu/ipc/service/gpu_channel_manager_delegate.h" +#include "mojo/public/cpp/bindings/associated_receiver.h" +#include "mojo/public/cpp/bindings/associated_remote.h" #include "ui/gl/init/gl_factory.h" #include "ui/gl/test/gl_surface_test_support.h" #include "url/gurl.h" @@ -126,8 +128,12 @@ void GpuChannelTestCommon::CreateCommandBuffer( Capabilities* out_capabilities) { base::RunLoop loop; auto quit = loop.QuitClosure(); + mojo::PendingAssociatedRemote remote; + mojo::PendingAssociatedRemote client; + ignore_result(client.InitWithNewEndpointAndPassReceiver()); channel.CreateCommandBuffer( std::move(init_params), routing_id, std::move(shared_state), + remote.InitWithNewEndpointAndPassReceiver(), std::move(client), base::BindLambdaForTesting( [&](ContextResult result, const Capabilities& capabilities) { *out_result = result; diff --git a/gpu/ipc/service/image_decode_accelerator_stub_unittest.cc b/gpu/ipc/service/image_decode_accelerator_stub_unittest.cc index ae11064ddb45e3..e939c3fd106ceb 100644 --- a/gpu/ipc/service/image_decode_accelerator_stub_unittest.cc +++ b/gpu/ipc/service/image_decode_accelerator_stub_unittest.cc @@ -325,7 +325,10 @@ class ImageDecodeAcceleratorStubTest channel->LookupCommandBuffer(kCommandBufferRouteId); ASSERT_TRUE(command_buffer); - // Make sure there are no pending tasks before starting the test. + // Make sure there are no pending tasks before starting the test. Command + // buffer creation creates some throw-away Mojo endpoints that will post + // some tasks. + base::RunLoop().RunUntilIdle(); ASSERT_TRUE(task_environment().MainThreadIsIdle()); } diff --git a/ipc/ipc_mojo_bootstrap.cc b/ipc/ipc_mojo_bootstrap.cc index 4c84885085552a..2afe03e846fcf5 100644 --- a/ipc/ipc_mojo_bootstrap.cc +++ b/ipc/ipc_mojo_bootstrap.cc @@ -27,6 +27,7 @@ #include "base/synchronization/lock.h" #include "base/task/common/task_annotator.h" #include "base/threading/thread_checker.h" +#include "base/threading/thread_local.h" #include "base/threading/thread_task_runner_handle.h" #include "base/trace_event/memory_allocator_dump.h" #include "base/trace_event/memory_dump_manager.h" @@ -51,6 +52,15 @@ namespace { class ChannelAssociatedGroupController; +base::ThreadLocalBoolean& GetOffSequenceBindingAllowedFlag() { + static base::NoDestructor flag; + return *flag; +} + +bool CanBindOffSequence() { + return GetOffSequenceBindingAllowedFlag().Get(); +} + // Used to track some internal Channel state in pursuit of message leaks. // // TODO(https://crbug.com/813045): Remove this. @@ -506,6 +516,8 @@ class ChannelAssociatedGroupController return task_runner_.get(); } + bool was_bound_off_sequence() const { return was_bound_off_sequence_; } + mojo::InterfaceEndpointClient* client() const { controller_->lock_.AssertAcquired(); return client_; @@ -516,16 +528,25 @@ class ChannelAssociatedGroupController controller_->lock_.AssertAcquired(); DCHECK(!client_); DCHECK(!closed_); - DCHECK(runner->RunsTasksInCurrentSequence()); task_runner_ = std::move(runner); client_ = client; + + const bool binding_to_calling_sequence = + task_runner_->RunsTasksInCurrentSequence(); + const bool binding_to_channel_sequence = + binding_to_calling_sequence && + (controller_->proxy_task_runner_->RunsTasksInCurrentSequence() || + controller_->task_runner_->RunsTasksInCurrentSequence()); + const bool tried_to_bind_off_sequence = + !binding_to_calling_sequence || !binding_to_channel_sequence; + if (tried_to_bind_off_sequence && CanBindOffSequence()) + was_bound_off_sequence_ = true; } void DetachClient() { controller_->lock_.AssertAcquired(); DCHECK(client_); - DCHECK(task_runner_->RunsTasksInCurrentSequence()); DCHECK(!closed_); task_runner_ = nullptr; @@ -667,6 +688,7 @@ class ChannelAssociatedGroupController bool closed_ = false; bool peer_closed_ = false; bool handle_created_ = false; + bool was_bound_off_sequence_ = false; absl::optional disconnect_reason_; mojo::InterfaceEndpointClient* client_ = nullptr; scoped_refptr task_runner_; @@ -868,13 +890,34 @@ class ChannelAssociatedGroupController mojo::InterfaceEndpointClient* client = endpoint->client(); if (!client || !endpoint->task_runner()->RunsTasksInCurrentSequence()) { - // No client has been bound yet or the client runs tasks on another - // thread. We assume the other thread must always be the one on which - // |proxy_task_runner_| runs tasks, since that's the only valid scenario. + // The ChannelProxy for this channel is bound to `proxy_task_runner_` and + // by default legacy IPCs must dispatch to either the IO thread or the + // proxy task runner. We generally impose the same constraint on + // associated interface endpoints so that FIFO can be guaranteed across + // all interfaces without stalling any of them to wait for a pending + // endpoint to be bound. + // + // This allows us to assume that if an endpoint is not yet bound when we + // receive a message targeting it, it *will* be bound on the proxy task + // runner by the time a newly posted task runs there. Hence we simply post + // a hopeful dispatch task to that task runner. // - // If the client is not yet bound, it must be bound by the time this task - // runs or else it's programmer error. - DCHECK(proxy_task_runner_); + // As it turns out, there are even some instances of endpoints binding to + // alternative (non-IO-thread, non-proxy) task runners, but still + // ultimately relying on the fact that we schedule their messages on the + // proxy task runner. So even if the endpoint is already bound, we + // default to scheduling it on the proxy task runner as long as it's not + // bound specifically to the IO task runner. + // TODO(rockot): Try to sort out these cases and maybe eliminate them. + // + // Finally, it's also possible that an endpoint was bound to an + // alternative task runner and it really does want its messages to + // dispatch there. In that case `was_bound_off_sequence()` will be true to + // signal that we should really use that task runner. + const scoped_refptr task_runner = + client && endpoint->was_bound_off_sequence() + ? endpoint->task_runner() + : proxy_task_runner_.get(); if (message->has_flag(mojo::Message::kFlagIsSync)) { MessageWrapper message_wrapper(this, std::move(*message)); @@ -885,28 +928,27 @@ class ChannelAssociatedGroupController // call will dequeue the message and dispatch it. uint32_t message_id = endpoint->EnqueueSyncMessage(std::move(message_wrapper)); - proxy_task_runner_->PostTask( + task_runner->PostTask( FROM_HERE, base::BindOnce(&ChannelAssociatedGroupController::AcceptSyncMessage, this, id, message_id)); return true; } - // If |proxy_task_runner_| has been torn down already, this PostTask will - // fail and destroy |message|. That operation may need to in turn destroy + // If |task_runner| has been torn down already, this PostTask will fail + // and destroy |message|. That operation may need to in turn destroy // in-transit associated endpoints and thus acquire |lock_|. We no longer - // need the lock to be held now since |proxy_task_runner_| is safe to - // access unguarded. + // need the lock to be held now, so we can release it before the PostTask. { // Grab interface name from |client| before releasing the lock to ensure // that |client| is safe to access. base::TaskAnnotator::ScopedSetIpcHash scoped_set_ipc_hash( client ? client->interface_name() : "unknown interface"); locker.Release(); - proxy_task_runner_->PostTask( + task_runner->PostTask( FROM_HERE, base::BindOnce( - &ChannelAssociatedGroupController::AcceptOnProxyThread, this, + &ChannelAssociatedGroupController::AcceptOnEndpointThread, this, std::move(*message))); } return true; @@ -919,10 +961,9 @@ class ChannelAssociatedGroupController return client->HandleIncomingMessage(message); } - void AcceptOnProxyThread(mojo::Message message) { - DCHECK(proxy_task_runner_->BelongsToCurrentThread()); + void AcceptOnEndpointThread(mojo::Message message) { TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("mojom"), - "ChannelAssociatedGroupController::AcceptOnProxyThread"); + "ChannelAssociatedGroupController::AcceptOnEndpointThread"); mojo::InterfaceId id = message.interface_id(); DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsPrimaryInterfaceId(id)); @@ -939,7 +980,8 @@ class ChannelAssociatedGroupController // Using client->interface_name() is safe here because this is a static // string defined for each mojo interface. TRACE_EVENT0("mojom", client->interface_name()); - DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); + DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence() || + proxy_task_runner_->RunsTasksInCurrentSequence()); // Sync messages should never make their way to this method. DCHECK(!message.has_flag(mojo::Message::kFlagIsSync)); @@ -1136,6 +1178,17 @@ class MojoBootstrapImpl : public MojoBootstrap { } // namespace +ScopedAllowOffSequenceChannelAssociatedBindings:: + ScopedAllowOffSequenceChannelAssociatedBindings() + : outer_flag_(GetOffSequenceBindingAllowedFlag().Get()) { + GetOffSequenceBindingAllowedFlag().Set(true); +} + +ScopedAllowOffSequenceChannelAssociatedBindings:: + ~ScopedAllowOffSequenceChannelAssociatedBindings() { + GetOffSequenceBindingAllowedFlag().Set(outer_flag_); +} + // static std::unique_ptr MojoBootstrap::Create( mojo::ScopedMessagePipeHandle handle, diff --git a/ipc/ipc_mojo_bootstrap.h b/ipc/ipc_mojo_bootstrap.h index d231ab2cb84a2a..5363dfd06c7239 100644 --- a/ipc/ipc_mojo_bootstrap.h +++ b/ipc/ipc_mojo_bootstrap.h @@ -25,6 +25,66 @@ namespace IPC { +// Incoming legacy IPCs have always been dispatched to one of two threads: the +// IO thread (when an installed MessageFilter handles the message), or the +// thread which owns the corresponding ChannelProxy receiving the message. There +// were no other places to route legacy IPC messages, so when a message arrived +// the legacy IPC system would run through its MessageFilters and if the message +// was still unhandled, it would be posted to the ChannelProxy thread for +// further processing. +// +// Mojo on the other hand allows for mutually associated endpoints (that is, +// endpoints which locally share the same message pipe) to span any number of +// threads while still guaranteeing that each endpoint on a given thread +// preserves FIFO order of messages dispatched there. This means that if a +// message arrives carrying a PendingAssociatedRemote/Receiver endpoint, and +// then another message arrives which targets that endpoint, the entire pipe +// will be blocked from dispatch until the endpoint is bound: otherwise we have +// no idea where to dispatch the message such that we can uphold the FIFO +// guarantee between the new endpoint and any other endpoints on the thread it +// ends up binding to. +// +// Channel-associated interfaces share a message pipe with the legacy IPC +// Channel, and in order to avoid nasty surprises during the migration process +// we decided to constrain how incoming Channel-associated endpoints could be +// bound: you must either bind them immediately as they arrive on the IO thread, +// or you must immediately post a task to the ChannelProxy thread to bind them. +// This allows all aforementioned FIFO guaratees to be upheld without ever +// stalling dispatch of legacy IPCs (particularly on the IO thread), because +// when we see a message targeting an unbound endpoint we can safely post it to +// the ChannelProxy's task runner before forging ahead to dispatch subsequent +// messages. No stalling. +// +// As there are some cases where a Channel-associated endpoint really wants to +// receive messages on a different TaskRunner, we want to allow that now. It's +// safe as long as the application can guarantee that the endpoint in question +// will be bound to a task runner *before* any messages are received for that +// endpoint. +// +// HOWEVER, it turns out that we cannot simply adhere to the application's +// wishes when an alternative TaskRunner is provided at binding time: over time +// we have accumulated application code which binds Channel-associated endpoints +// to task runners which -- while running tasks exclusively on the ChannelProxy +// thread -- are not the ChannelProxy's own task runner. Such code now +// implicitly relies on the behavior of Channel-associated interfaces always +// dispatching their messages to the ChannelProxy task runner. This is tracked +// by https://crbug.com/1209188. +// +// Finally, the point: if you really know you want to bind your endpoint to an +// alternative task runner and you can really guarantee that no messages may +// have already arrived for it on the IO thread, you can do the binding within +// the extent of a ScopedAllowOffSequenceChannelAssociatedBindings. This will +// flag the endpoint such that it honors your binding configuration, and its +// incoming messages will actually dispatch to the task runner you provide. +class COMPONENT_EXPORT(IPC) ScopedAllowOffSequenceChannelAssociatedBindings { + public: + ScopedAllowOffSequenceChannelAssociatedBindings(); + ~ScopedAllowOffSequenceChannelAssociatedBindings(); + + private: + const bool outer_flag_; +}; + // MojoBootstrap establishes a pair of associated interfaces between two // processes in Chrome. // diff --git a/mojo/public/cpp/bindings/associated_receiver.h b/mojo/public/cpp/bindings/associated_receiver.h index 92b4c0e8800409..82bf604dc8b524 100644 --- a/mojo/public/cpp/bindings/associated_receiver.h +++ b/mojo/public/cpp/bindings/associated_receiver.h @@ -45,6 +45,12 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) AssociatedReceiverBase { void FlushForTesting(); + // Please see comments on the same method of InterfaceEndpointClient. + void ResetFromAnotherSequenceUnsafe() { + if (endpoint_client_) + endpoint_client_->ResetFromAnotherSequenceUnsafe(); + } + protected: ~AssociatedReceiverBase(); diff --git a/mojo/public/cpp/bindings/interface_endpoint_client.h b/mojo/public/cpp/bindings/interface_endpoint_client.h index 738a2ab316f29e..71fbae16b71b1a 100644 --- a/mojo/public/cpp/bindings/interface_endpoint_client.h +++ b/mojo/public/cpp/bindings/interface_endpoint_client.h @@ -192,6 +192,15 @@ class COMPONENT_EXPORT(MOJO_CPP_BINDINGS) InterfaceEndpointClient } #endif + // This allows the endpoint to be reset from a sequence other than the one on + // which it was bound. This should only be used with caution, and it is + // critical that the calling sequence cannot run tasks concurrently with the + // bound sequence. There's no practical way for this to be asserted, so we + // have to take your word for it. If this constraint is not upheld, there will + // be data races internal to the bindings object which can lead to UAFs or + // surprise message dispatches. + void ResetFromAnotherSequenceUnsafe(); + private: // Maps from the id of a response to the MessageReceiver that handles the // response. diff --git a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc index 4ea3c21fd65ea4..96d4390430f264 100644 --- a/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc +++ b/mojo/public/cpp/bindings/lib/interface_endpoint_client.cc @@ -442,11 +442,6 @@ InterfaceEndpointClient::InterfaceEndpointClient( base::BindOnce(&InterfaceEndpointClient::OnAssociationEvent, weak_ptr_factory_.GetWeakPtr()))); } - } else if (!task_runner_->RunsTasksInCurrentSequence()) { - task_runner_->PostTask( - FROM_HERE, - base::BindOnce(&InterfaceEndpointClient::InitControllerIfNecessary, - weak_ptr_factory_.GetWeakPtr())); } else { InitControllerIfNecessary(); } @@ -777,6 +772,17 @@ void InterfaceEndpointClient::MaybeSendNotifyIdle() { } } +void InterfaceEndpointClient::ResetFromAnotherSequenceUnsafe() { + DETACH_FROM_SEQUENCE(sequence_checker_); + + if (controller_) { + controller_ = nullptr; + handle_.group_controller()->DetachEndpointClient(handle_); + } + + handle_.reset(); +} + void InterfaceEndpointClient::InitControllerIfNecessary() { if (controller_ || handle_.pending_association()) return; diff --git a/mojo/public/cpp/bindings/lib/multiplex_router.cc b/mojo/public/cpp/bindings/lib/multiplex_router.cc index 6039e764a714ce..fcec0a407d4cf7 100644 --- a/mojo/public/cpp/bindings/lib/multiplex_router.cc +++ b/mojo/public/cpp/bindings/lib/multiplex_router.cc @@ -88,7 +88,6 @@ class MultiplexRouter::InterfaceEndpoint router_->AssertLockAcquired(); DCHECK(!client_); DCHECK(!closed_); - DCHECK(runner->RunsTasksInCurrentSequence()); task_runner_ = std::move(runner); client_ = client; @@ -99,7 +98,6 @@ class MultiplexRouter::InterfaceEndpoint void DetachClient() { router_->AssertLockAcquired(); DCHECK(client_); - DCHECK(task_runner_->RunsTasksInCurrentSequence()); DCHECK(!closed_); task_runner_ = nullptr; diff --git a/mojo/public/cpp/bindings/shared_associated_remote.h b/mojo/public/cpp/bindings/shared_associated_remote.h index 1b592831045b9d..4a3f37800d9456 100644 --- a/mojo/public/cpp/bindings/shared_associated_remote.h +++ b/mojo/public/cpp/bindings/shared_associated_remote.h @@ -5,6 +5,7 @@ #ifndef MOJO_PUBLIC_CPP_BINDINGS_SHARED_ASSOCIATED_REMOTE_H_ #define MOJO_PUBLIC_CPP_BINDINGS_SHARED_ASSOCIATED_REMOTE_H_ +#include "base/memory/ref_counted.h" #include "base/threading/sequenced_task_runner_handle.h" #include "mojo/public/cpp/bindings/associated_remote.h" #include "mojo/public/cpp/bindings/pending_associated_remote.h" @@ -32,12 +33,10 @@ class SharedAssociatedRemote { explicit SharedAssociatedRemote( PendingAssociatedRemote pending_remote, scoped_refptr bind_task_runner = - base::SequencedTaskRunnerHandle::Get()) - : remote_(pending_remote.is_valid() - ? SharedRemoteBase>::Create( - std::move(pending_remote), - std::move(bind_task_runner)) - : nullptr) {} + base::SequencedTaskRunnerHandle::Get()) { + if (pending_remote.is_valid()) + Bind(std::move(pending_remote), std::move(bind_task_runner)); + } bool is_bound() const { return remote_ != nullptr; } explicit operator bool() const { return is_bound(); } @@ -51,6 +50,27 @@ class SharedAssociatedRemote { // reference the same underlying endpoint. void reset() { remote_.reset(); } + // Creates a new pair of endpoints and binds this SharedAssociatedRemote to + // one of them, on `task_runner`. The other is returned as a receiver. + mojo::PendingAssociatedReceiver BindNewEndpointAndPassReceiver( + scoped_refptr bind_task_runner = + base::SequencedTaskRunnerHandle::Get()) { + mojo::PendingAssociatedRemote remote; + auto receiver = remote.InitWithNewEndpointAndPassReceiver(); + Bind(std::move(remote), std::move(bind_task_runner)); + return receiver; + } + + // Binds to `pending_remote` on `bind_task_runner`. + void Bind(PendingAssociatedRemote pending_remote, + scoped_refptr bind_task_runner = + base::SequencedTaskRunnerHandle::Get()) { + DCHECK(!remote_); + DCHECK(pending_remote.is_valid()); + remote_ = SharedRemoteBase>::Create( + std::move(pending_remote), std::move(bind_task_runner)); + } + private: scoped_refptr>> remote_; };