Skip to content

Commit

Permalink
Mojo C++ Bindings: Movable mojo::Message
Browse files Browse the repository at this point in the history
Gets rid of MoveTo() in favor of proper move construction
and assignment.

Eliminates a level of indirection in several places where
std::unique_ptr<mojo::Message> was used to pass a Message
around, e.g. across threads.

BUG=None
R=yzshen@chromium.org

Review-Url: https://codereview.chromium.org/2262633002
Cr-Commit-Position: refs/heads/master@{#413197}
  • Loading branch information
krockot authored and Commit bot committed Aug 19, 2016
1 parent 3298410 commit c4cc691
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 107 deletions.
2 changes: 1 addition & 1 deletion ipc/ipc_message_pipe_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MessageSerializer : public mojo::MessageReceiverWithResponder {
private:
// mojo::MessageReceiverWithResponder
bool Accept(mojo::Message* message) override {
message->MoveTo(&message_);
message_ = std::move(*message);
return true;
}

Expand Down
60 changes: 24 additions & 36 deletions ipc/ipc_mojo_bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class ChannelAssociatedGroupController
base::Bind(&ChannelAssociatedGroupController::OnPipeError,
base::Unretained(this)));

std::vector<std::unique_ptr<mojo::Message>> outgoing_messages;
std::vector<mojo::Message> outgoing_messages;
std::swap(outgoing_messages, outgoing_messages_);
for (auto& message : outgoing_messages)
SendMessage(message.get());
SendMessage(&message);
}

void CreateChannelEndpoints(mojom::ChannelAssociatedPtr* sender,
Expand Down Expand Up @@ -279,7 +279,7 @@ class ChannelAssociatedGroupController
sync_watcher_.reset();
}

uint32_t EnqueueSyncMessage(std::unique_ptr<mojo::Message> message) {
uint32_t EnqueueSyncMessage(mojo::Message message) {
controller_->lock_.AssertAcquired();
uint32_t id = GenerateSyncMessageId();
sync_messages_.emplace(id, std::move(message));
Expand All @@ -293,12 +293,11 @@ class ChannelAssociatedGroupController
sync_message_event_->Signal();
}

std::unique_ptr<mojo::Message> PopSyncMessage(uint32_t id) {
mojo::Message PopSyncMessage(uint32_t id) {
controller_->lock_.AssertAcquired();
if (sync_messages_.empty() || sync_messages_.front().first != id)
return nullptr;
std::unique_ptr<mojo::Message> message =
std::move(sync_messages_.front().second);
return mojo::Message();
mojo::Message message = std::move(sync_messages_.front().second);
sync_messages_.pop();
return message;
}
Expand Down Expand Up @@ -352,15 +351,14 @@ class ChannelAssociatedGroupController
base::AutoLock locker(controller_->lock_);
bool more_to_process = false;
if (!sync_messages_.empty()) {
std::unique_ptr<mojo::Message> message(
std::move(sync_messages_.front().second));
mojo::Message message = std::move(sync_messages_.front().second);
sync_messages_.pop();

bool dispatch_succeeded;
mojo::InterfaceEndpointClient* client = client_;
{
base::AutoUnlock unlocker(controller_->lock_);
dispatch_succeeded = client->HandleIncomingMessage(message.get());
dispatch_succeeded = client->HandleIncomingMessage(&message);
}

if (!sync_messages_.empty())
Expand Down Expand Up @@ -426,8 +424,7 @@ class ChannelAssociatedGroupController
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
std::unique_ptr<mojo::SyncHandleWatcher> sync_watcher_;
std::unique_ptr<MojoEvent> sync_message_event_;
std::queue<std::pair<uint32_t, std::unique_ptr<mojo::Message>>>
sync_messages_;
std::queue<std::pair<uint32_t, mojo::Message>> sync_messages_;
uint32_t next_sync_message_id_ = 0;

DISALLOW_COPY_AND_ASSIGN(Endpoint);
Expand Down Expand Up @@ -470,30 +467,26 @@ class ChannelAssociatedGroupController
DCHECK(thread_checker_.CalledOnValidThread());
if (!connector_) {
// Pipe may not be bound yet, so we queue the message.
std::unique_ptr<mojo::Message> queued_message(new mojo::Message);
message->MoveTo(queued_message.get());
outgoing_messages_.emplace_back(std::move(queued_message));
outgoing_messages_.emplace_back(std::move(*message));
return true;
}
return connector_->Accept(message);
} else {
// We always post tasks to the master endpoint thread when called from the
// proxy thread in order to simulate IPC::ChannelProxy::Send behavior.
DCHECK(proxy_task_runner_->BelongsToCurrentThread());
std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
message->MoveTo(passed_message.get());
task_runner_->PostTask(
FROM_HERE,
base::Bind(
&ChannelAssociatedGroupController::SendMessageOnMasterThread,
this, base::Passed(&passed_message)));
this, base::Passed(message)));
return true;
}
}

void SendMessageOnMasterThread(std::unique_ptr<mojo::Message> message) {
void SendMessageOnMasterThread(mojo::Message message) {
DCHECK(thread_checker_.CalledOnValidThread());
if (!SendMessage(message.get()))
if (!SendMessage(&message))
RaiseError();
}

Expand Down Expand Up @@ -611,17 +604,13 @@ class ChannelAssociatedGroupController
// runs or else it's programmer error.
DCHECK(proxy_task_runner_);

std::unique_ptr<mojo::Message> passed_message(new mojo::Message);
message->MoveTo(passed_message.get());

if (passed_message->has_flag(mojo::Message::kFlagIsSync)) {
if (message->has_flag(mojo::Message::kFlagIsSync)) {
// Sync messages may need to be handled by the endpoint if it's blocking
// on a sync reply. We pass ownership of the message to the endpoint's
// sync message queue. If the endpoint was blocking, it will dequeue the
// message and dispatch it. Otherwise the posted |AcceptSyncMessage()|
// call will dequeue the message and dispatch it.
uint32_t message_id =
endpoint->EnqueueSyncMessage(std::move(passed_message));
uint32_t message_id = endpoint->EnqueueSyncMessage(std::move(*message));
proxy_task_runner_->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController::AcceptSyncMessage,
Expand All @@ -632,7 +621,7 @@ class ChannelAssociatedGroupController
proxy_task_runner_->PostTask(
FROM_HERE,
base::Bind(&ChannelAssociatedGroupController::AcceptOnProxyThread,
this, base::Passed(&passed_message)));
this, base::Passed(message)));
return true;
}

Expand All @@ -645,10 +634,10 @@ class ChannelAssociatedGroupController
return client->HandleIncomingMessage(message);
}

void AcceptOnProxyThread(std::unique_ptr<mojo::Message> message) {
void AcceptOnProxyThread(mojo::Message message) {
DCHECK(proxy_task_runner_->BelongsToCurrentThread());

mojo::InterfaceId id = message->interface_id();
mojo::InterfaceId id = message.interface_id();
DCHECK(mojo::IsValidInterfaceId(id) && !mojo::IsMasterInterfaceId(id));

base::AutoLock locker(lock_);
Expand All @@ -663,12 +652,12 @@ class ChannelAssociatedGroupController
DCHECK(endpoint->task_runner()->BelongsToCurrentThread());

// Sync messages should never make their way to this method.
DCHECK(!message->has_flag(mojo::Message::kFlagIsSync));
DCHECK(!message.has_flag(mojo::Message::kFlagIsSync));

bool result = false;
{
base::AutoUnlock unlocker(lock_);
result = client->HandleIncomingMessage(message.get());
result = client->HandleIncomingMessage(&message);
}

if (!result)
Expand All @@ -684,12 +673,11 @@ class ChannelAssociatedGroupController
return;

DCHECK(endpoint->task_runner()->BelongsToCurrentThread());
std::unique_ptr<mojo::Message> message =
endpoint->PopSyncMessage(message_id);
mojo::Message message = endpoint->PopSyncMessage(message_id);

// The message must have already been dequeued by the endpoint waking up
// from a sync wait. Nothing to do.
if (!message)
if (message.IsNull())
return;

mojo::InterfaceEndpointClient* client = endpoint->client();
Expand All @@ -699,7 +687,7 @@ class ChannelAssociatedGroupController
bool result = false;
{
base::AutoUnlock unlocker(lock_);
result = client->HandleIncomingMessage(message.get());
result = client->HandleIncomingMessage(&message);
}

if (!result)
Expand Down Expand Up @@ -771,7 +759,7 @@ class ChannelAssociatedGroupController

// Outgoing messages that were sent before this controller was bound to a
// real message pipe.
std::vector<std::unique_ptr<mojo::Message>> outgoing_messages_;
std::vector<mojo::Message> outgoing_messages_;

// Guards the fields below for thread-safe access.
base::Lock lock_;
Expand Down
2 changes: 1 addition & 1 deletion mojo/public/cpp/bindings/interface_endpoint_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class InterfaceEndpointClient : public MessageReceiverWithResponder {
explicit SyncResponseInfo(bool* in_response_received);
~SyncResponseInfo();

std::unique_ptr<Message> response;
Message response;

// Points to a stack-allocated variable.
bool* response_received;
Expand Down
9 changes: 3 additions & 6 deletions mojo/public/cpp/bindings/lib/interface_endpoint_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,8 @@ bool InterfaceEndpointClient::AcceptWithResponder(Message* message,
DCHECK(base::ContainsKey(sync_responses_, request_id));
auto iter = sync_responses_.find(request_id);
DCHECK_EQ(&response_received, iter->second->response_received);
if (response_received) {
std::unique_ptr<Message> response = std::move(iter->second->response);
ignore_result(sync_responder->Accept(response.get()));
}
if (response_received)
ignore_result(sync_responder->Accept(&iter->second->response));
sync_responses_.erase(iter);
}

Expand Down Expand Up @@ -292,8 +290,7 @@ bool InterfaceEndpointClient::HandleValidatedMessage(Message* message) {
auto it = sync_responses_.find(request_id);
if (it == sync_responses_.end())
return false;
it->second->response.reset(new Message());
message->MoveTo(it->second->response.get());
it->second->response = std::move(*message);
*it->second->response_received = true;
return true;
}
Expand Down
47 changes: 26 additions & 21 deletions mojo/public/cpp/bindings/lib/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,36 @@ base::LazyInstance<base::ThreadLocalPointer<internal::MessageDispatchContext>>
base::LazyInstance<base::ThreadLocalPointer<SyncMessageResponseContext>>
g_tls_sync_response_context = LAZY_INSTANCE_INITIALIZER;

void DoNotifyBadMessage(Message message, const std::string& error) {
message.NotifyBadMessage(error);
}

} // namespace

Message::Message() {
}

Message::Message(Message&& other)
: buffer_(std::move(other.buffer_)), handles_(std::move(other.handles_)) {
}

Message::~Message() {
CloseHandles();
}

Message& Message::operator=(Message&& other) {
Reset();
std::swap(other.buffer_, buffer_);
std::swap(other.handles_, handles_);
return *this;
}

void Message::Reset() {
CloseHandles();
handles_.clear();
buffer_.reset();
}

void Message::Initialize(size_t capacity, bool zero_initialized) {
DCHECK(!buffer_);
buffer_.reset(new internal::MessageBuffer(capacity, zero_initialized));
Expand All @@ -49,18 +70,6 @@ void Message::InitializeFromMojoMessage(ScopedMessageHandle message,
handles_.swap(*handles);
}

void Message::MoveTo(Message* destination) {
DCHECK(this != destination);

// No copy needed.
std::swap(destination->buffer_, buffer_);
std::swap(destination->handles_, handles_);

CloseHandles();
handles_.clear();
buffer_.reset();
}

ScopedMessageHandle Message::TakeMojoMessage() {
if (handles_.empty()) // Fast path for the common case: No handles.
return buffer_->TakeMessage();
Expand Down Expand Up @@ -127,10 +136,8 @@ void SyncMessageResponseContext::ReportBadMessage(const std::string& error) {
const ReportBadMessageCallback&
SyncMessageResponseContext::GetBadMessageCallback() {
if (bad_message_callback_.is_null()) {
std::unique_ptr<Message> new_message(new Message);
response_.MoveTo(new_message.get());
bad_message_callback_ = base::Bind(&Message::NotifyBadMessage,
base::Owned(new_message.release()));
bad_message_callback_ =
base::Bind(&DoNotifyBadMessage, base::Passed(&response_));
}
return bad_message_callback_;
}
Expand Down Expand Up @@ -200,10 +207,8 @@ MessageDispatchContext* MessageDispatchContext::current() {
const ReportBadMessageCallback&
MessageDispatchContext::GetBadMessageCallback() {
if (bad_message_callback_.is_null()) {
std::unique_ptr<Message> new_message(new Message);
message_->MoveTo(new_message.get());
bad_message_callback_ = base::Bind(&Message::NotifyBadMessage,
base::Owned(new_message.release()));
bad_message_callback_ =
base::Bind(&DoNotifyBadMessage, base::Passed(message_));
}
return bad_message_callback_;
}
Expand All @@ -212,7 +217,7 @@ MessageDispatchContext::GetBadMessageCallback() {
void SyncMessageResponseSetup::SetCurrentSyncResponseMessage(Message* message) {
SyncMessageResponseContext* context = SyncMessageResponseContext::current();
if (context)
message->MoveTo(&context->response_);
context->response_ = std::move(*message);
}

} // namespace internal
Expand Down
Loading

0 comments on commit c4cc691

Please sign in to comment.