From 24e44d30a83ca98eaf909ee916d3a34721113376 Mon Sep 17 00:00:00 2001 From: erikchen Date: Wed, 21 Oct 2015 15:28:54 -0700 Subject: [PATCH] ipc: Allow attachment broker messages to be processed out of order. There are no ordering restrictions on messages destined for the attachment broker, and the message may be needed to dispatch messages waiting in |message_queue_|. This CL introduces a small behavioral change ot ChannelReader - messages for the attachment broker are processed immediately. This CL also includes a small refactor of ChannelReader to split the function TranslateInputData into several, smaller, better documented functions. BUG=535711 Review URL: https://codereview.chromium.org/1421473004 Cr-Commit-Position: refs/heads/master@{#355411} --- ipc/attachment_broker_mac_unittest.cc | 78 ++++++++++-- ipc/ipc_channel_posix.cc | 3 +- ipc/ipc_channel_reader.cc | 163 ++++++++++++++++---------- ipc/ipc_channel_reader.h | 40 +++++-- ipc/ipc_channel_win.cc | 3 +- ipc/ipc_message.cc | 2 + 6 files changed, 204 insertions(+), 85 deletions(-) diff --git a/ipc/attachment_broker_mac_unittest.cc b/ipc/attachment_broker_mac_unittest.cc index 09dfe8489f766a..a78a0ba7948dad 100644 --- a/ipc/attachment_broker_mac_unittest.cc +++ b/ipc/attachment_broker_mac_unittest.cc @@ -368,11 +368,23 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase { broker_.reset(broker); } + // Mach Setup that needs to occur before child processes are forked. + void MachPreForkSetUp() { + service_name_ = IPC::CreateRandomServiceName(); + server_port_.reset(IPC::BecomeMachServer(service_name_.c_str()).release()); + } + + // Mach Setup that needs to occur after child processes are forked. + void MachPostForkSetUp() { + client_port_.reset(IPC::ReceiveMachPort(server_port_.get()).release()); + IPC::SendMachPort( + client_port_.get(), mach_task_self(), MACH_MSG_TYPE_COPY_SEND); + } + // Setup shared between tests. void CommonSetUp(const char* name) { Init(name); - service_name_ = IPC::CreateRandomServiceName(); - server_port_.reset(IPC::BecomeMachServer(service_name_.c_str()).release()); + MachPreForkSetUp(); if (!broker_.get()) SetBroker(new IPC::AttachmentBrokerUnprivilegedMac); @@ -383,9 +395,7 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase { ASSERT_TRUE(ConnectChannel()); ASSERT_TRUE(StartClient()); - client_port_.reset(IPC::ReceiveMachPort(server_port_.get()).release()); - IPC::SendMachPort( - client_port_.get(), mach_task_self(), MACH_MSG_TYPE_COPY_SEND); + MachPostForkSetUp(); active_names_at_start_ = IPC::GetActiveNameCount(); get_proxy_listener()->set_listener(&result_listener_); } @@ -428,6 +438,10 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase { AttachmentBrokerObserver* get_observer() { return &observer_; } ResultListener* get_result_listener() { return &result_listener_; } + protected: + // The number of active names immediately after set up. + mach_msg_type_number_t active_names_at_start_; + private: ProxyListener proxy_listener_; scoped_ptr broker_; @@ -441,9 +455,6 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase { // process. base::mac::ScopedMachSendRight client_port_; - // The number of active names immediately after set up. - mach_msg_type_number_t active_names_at_start_; - std::string service_name_; ResultListener result_listener_; @@ -837,4 +848,55 @@ MULTIPROCESS_IPC_TEST_CLIENT_MAIN(SendSharedMemoryHandleToSelf) { "SendSharedMemoryHandleToSelf"); } +// Similar to SendSharedMemoryHandle, but uses a ChannelProxy instead of a +// Channel. +TEST_F(IPCAttachmentBrokerMacTest, SendSharedMemoryHandleChannelProxy) { + Init("SendSharedMemoryHandleChannelProxy"); + MachPreForkSetUp(); + + SetBroker(new IPC::AttachmentBrokerUnprivilegedMac); + get_broker()->AddObserver(get_observer()); + + scoped_ptr thread( + new base::Thread("ChannelProxyTestServerThread")); + base::Thread::Options options; + options.message_loop_type = base::MessageLoop::TYPE_IO; + thread->StartWithOptions(options); + + CreateChannelProxy(get_proxy_listener(), thread->task_runner().get()); + get_broker()->DesignateBrokerCommunicationChannel(channel_proxy()); + + ASSERT_TRUE(StartClient()); + + MachPostForkSetUp(); + active_names_at_start_ = IPC::GetActiveNameCount(); + get_proxy_listener()->set_listener(get_result_listener()); + + SendMessage1(kDataBuffer1); + base::MessageLoop::current()->Run(); + + CheckChildResult(); + + // There should be no leaked names. + EXPECT_EQ(active_names_at_start_, IPC::GetActiveNameCount()); + + // Close the channel so the client's OnChannelError() gets fired. + channel_proxy()->Close(); + + EXPECT_TRUE(WaitForClientShutdown()); + DestroyChannelProxy(); +} + +void SendSharedMemoryHandleChannelProxyCallback(IPC::Sender* sender, + const IPC::Message& message) { + bool success = CheckContentsOfMessage1(message, kDataBuffer1); + SendControlMessage(sender, success); +} + +MULTIPROCESS_IPC_TEST_CLIENT_MAIN(SendSharedMemoryHandleChannelProxy) { + return CommonPrivilegedProcessMain( + &SendSharedMemoryHandleChannelProxyCallback, + "SendSharedMemoryHandleChannelProxy"); +} + } // namespace diff --git a/ipc/ipc_channel_posix.cc b/ipc/ipc_channel_posix.cc index 51461e813df5c7..85f4475c7ab761 100644 --- a/ipc/ipc_channel_posix.cc +++ b/ipc/ipc_channel_posix.cc @@ -693,8 +693,7 @@ void ChannelPosix::OnFileCanWriteWithoutBlocking(int fd) { bool ChannelPosix::ProcessMessageForDelivery(Message* message) { // Sending a brokerable attachment requires a call to Channel::Send(), so - // Send() may be re-entrant. Brokered attachments must be sent before the - // Message itself. + // Send() may be re-entrant. if (message->HasBrokerableAttachments()) { DCHECK(GetAttachmentBroker()); DCHECK(peer_pid_ != base::kNullProcessId); diff --git a/ipc/ipc_channel_reader.cc b/ipc/ipc_channel_reader.cc index de6a12818fbc47..66e3f17528c4a8 100644 --- a/ipc/ipc_channel_reader.cc +++ b/ipc/ipc_channel_reader.cc @@ -69,6 +69,12 @@ void ChannelReader::CleanUp() { } } +void ChannelReader::DispatchMessage(Message* m) { + EmitLogBeforeDispatch(*m); + listener_->OnMessageReceived(*m); + HandleDispatchError(*m); +} + bool ChannelReader::TranslateInputData(const char* input_data, int input_data_len) { const char* p; @@ -96,37 +102,9 @@ bool ChannelReader::TranslateInputData(const char* input_data, int pickle_len = static_cast(info.pickle_end - p); Message translated_message(p, pickle_len); - UMA_HISTOGRAM_MEMORY_KB( - "Memory.IPCChannelReader.ReceivedMessageSize", - static_cast(translated_message.size())); - - for (const auto& id : info.attachment_ids) - translated_message.AddPlaceholderBrokerableAttachmentWithId(id); - - if (!GetNonBrokeredAttachments(&translated_message)) + if (!HandleTranslatedMessage(&translated_message, info.attachment_ids)) return false; - // If there are no queued messages, attempt to immediately dispatch the - // newly translated message. - if (queued_messages_.empty()) { - DCHECK(blocked_ids_.empty()); - AttachmentIdSet blocked_ids = - GetBrokeredAttachments(&translated_message); - - if (blocked_ids.empty()) { - // Dispatch the message and continue the loop. - DispatchMessage(&translated_message); - p = info.message_end; - continue; - } - - blocked_ids_.swap(blocked_ids); - StartObservingAttachmentBroker(); - } - - // Make a deep copy of |translated_message| to add to the queue. - scoped_ptr m(new Message(translated_message)); - queued_messages_.push_back(m.release()); p = info.message_end; } else { // Last message is partial. @@ -156,6 +134,96 @@ bool ChannelReader::TranslateInputData(const char* input_data, return true; } +bool ChannelReader::HandleTranslatedMessage( + Message* translated_message, + const AttachmentIdVector& attachment_ids) { + UMA_HISTOGRAM_MEMORY_KB( + "Memory.IPCChannelReader.ReceivedMessageSize", + static_cast(translated_message->size())); + + // Immediately handle internal messages. + if (IsInternalMessage(*translated_message)) { + EmitLogBeforeDispatch(*translated_message); + HandleInternalMessage(*translated_message); + HandleDispatchError(*translated_message); + return true; + } + + translated_message->set_sender_pid(GetSenderPID()); + + // Immediately handle attachment broker messages. + if (DispatchAttachmentBrokerMessage(*translated_message)) { + // Ideally, the log would have been emitted prior to dispatching the + // message, but that would require this class to know more about the + // internals of attachment brokering, which should be avoided. + EmitLogBeforeDispatch(*translated_message); + HandleDispatchError(*translated_message); + return true; + } + + return HandleExternalMessage(translated_message, attachment_ids); +} + +bool ChannelReader::HandleExternalMessage( + Message* external_message, + const AttachmentIdVector& attachment_ids) { + for (const auto& id : attachment_ids) + external_message->AddPlaceholderBrokerableAttachmentWithId(id); + + if (!GetNonBrokeredAttachments(external_message)) + return false; + + // If there are no queued messages, attempt to immediately dispatch the + // newly translated message. + if (queued_messages_.empty()) { + DCHECK(blocked_ids_.empty()); + AttachmentIdSet blocked_ids = GetBrokeredAttachments(external_message); + + if (blocked_ids.empty()) { + DispatchMessage(external_message); + return true; + } + + blocked_ids_.swap(blocked_ids); + StartObservingAttachmentBroker(); + } + + // Make a deep copy of |external_message| to add to the queue. + scoped_ptr m(new Message(*external_message)); + queued_messages_.push_back(m.release()); + return true; +} + +void ChannelReader::HandleDispatchError(const Message& message) { + if (message.dispatch_error()) + listener_->OnBadMessageReceived(message); +} + +void ChannelReader::EmitLogBeforeDispatch(const Message& message) { +#ifdef IPC_MESSAGE_LOG_ENABLED + std::string name; + Logging::GetInstance()->GetMessageText(message.type(), &name, &message, NULL); + TRACE_EVENT_WITH_FLOW1("ipc,toplevel", "ChannelReader::DispatchInputData", + message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "name", + name); +#else + TRACE_EVENT_WITH_FLOW2("ipc,toplevel", "ChannelReader::DispatchInputData", + message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "class", + IPC_MESSAGE_ID_CLASS(message.type()), "line", + IPC_MESSAGE_ID_LINE(message.type())); +#endif +} + +bool ChannelReader::DispatchAttachmentBrokerMessage(const Message& message) { +#if USE_ATTACHMENT_BROKER + if (IsAttachmentBrokerEndpoint() && GetAttachmentBroker()) { + return GetAttachmentBroker()->OnMessageReceived(message); + } +#endif // USE_ATTACHMENT_BROKER + + return false; +} + ChannelReader::DispatchState ChannelReader::DispatchMessages() { while (!queued_messages_.empty()) { if (!blocked_ids_.empty()) @@ -176,42 +244,6 @@ ChannelReader::DispatchState ChannelReader::DispatchMessages() { return DISPATCH_FINISHED; } -void ChannelReader::DispatchMessage(Message* m) { - m->set_sender_pid(GetSenderPID()); - -#ifdef IPC_MESSAGE_LOG_ENABLED - std::string name; - Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL); - TRACE_EVENT_WITH_FLOW1("ipc,toplevel", - "ChannelReader::DispatchInputData", - m->flags(), - TRACE_EVENT_FLAG_FLOW_IN, - "name", name); -#else - TRACE_EVENT_WITH_FLOW2("ipc,toplevel", - "ChannelReader::DispatchInputData", - m->flags(), - TRACE_EVENT_FLAG_FLOW_IN, - "class", IPC_MESSAGE_ID_CLASS(m->type()), - "line", IPC_MESSAGE_ID_LINE(m->type())); -#endif - - bool handled = false; - if (IsInternalMessage(*m)) { - HandleInternalMessage(*m); - handled = true; - } -#if USE_ATTACHMENT_BROKER - if (!handled && IsAttachmentBrokerEndpoint() && GetAttachmentBroker()) { - handled = GetAttachmentBroker()->OnMessageReceived(*m); - } -#endif // USE_ATTACHMENT_BROKER - if (!handled) - listener_->OnMessageReceived(*m); - if (m->dispatch_error()) - listener_->OnBadMessageReceived(*m); -} - ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( Message* msg) { std::set blocked_ids; @@ -223,6 +255,7 @@ ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments( for (const BrokerableAttachment* attachment : brokerable_attachments_copy) { if (attachment->NeedsBrokering()) { AttachmentBroker* broker = GetAttachmentBroker(); + DCHECK(broker); scoped_refptr brokered_attachment; bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(), &brokered_attachment); diff --git a/ipc/ipc_channel_reader.h b/ipc/ipc_channel_reader.h index 46775226f9a13a..8e2fcba82c9036 100644 --- a/ipc/ipc_channel_reader.h +++ b/ipc/ipc_channel_reader.h @@ -130,16 +130,40 @@ class IPC_EXPORT ChannelReader : public SupportsAttachmentBrokering, FRIEND_TEST_ALL_PREFIXES(ChannelReaderTest, ResizeOverflowBuffer); FRIEND_TEST_ALL_PREFIXES(ChannelReaderTest, InvalidMessageSize); - typedef std::set AttachmentIdSet; - - // Takes the given data received from the IPC channel, translates it into - // Messages, and puts them in queued_messages_. - // As an optimization, after a message is translated, the message is - // immediately dispatched if able. This prevents an otherwise unnecessary deep - // copy of the message which is needed to store the message in the message - // queue. + using AttachmentIdSet = std::set; + using AttachmentIdVector = std::vector; + + // Takes the data received from the IPC channel and translates it into + // Messages. Complete messages are passed to HandleTranslatedMessage(). + // Returns |false| on unrecoverable error. bool TranslateInputData(const char* input_data, int input_data_len); + // Internal messages and messages bound for the attachment broker are + // immediately dispatched. Other messages are passed to + // HandleExternalMessage(). + // Returns |false| on unrecoverable error. + bool HandleTranslatedMessage(Message* translated_message, + const AttachmentIdVector& attachment_ids); + + // Populates the message with brokered and non-brokered attachments. If + // possible, the message is immediately dispatched. Otherwise, a deep copy of + // the message is added to |queued_messages_|. |blocked_ids_| are updated if + // necessary. + bool HandleExternalMessage(Message* external_message, + const AttachmentIdVector& attachment_ids); + + // If there was a dispatch error, informs |listener_|. + void HandleDispatchError(const Message& message); + + // Emits logging associated with a Message that is about to be dispatched. + void EmitLogBeforeDispatch(const Message& message); + + // Attachment broker messages should be dispatched out of band, since there + // are no ordering restrictions on them, and they may be required to dispatch + // the messages waiting in |queued_messages_|. + // Returns true if the attachment broker handled |message|. + bool DispatchAttachmentBrokerMessage(const Message& message); + // Dispatches messages from queued_messages_ to listeners. Successfully // dispatched messages are removed from queued_messages_. DispatchState DispatchMessages(); diff --git a/ipc/ipc_channel_win.cc b/ipc/ipc_channel_win.cc index d4640f4de1ed86..2a487ffbca3dc0 100644 --- a/ipc/ipc_channel_win.cc +++ b/ipc/ipc_channel_win.cc @@ -103,8 +103,7 @@ bool ChannelWin::Send(Message* message) { bool ChannelWin::ProcessMessageForDelivery(Message* message) { // Sending a brokerable attachment requires a call to Channel::Send(), so - // both Send() and ProcessMessageForDelivery() may be re-entrant. Brokered - // attachments must be sent before the Message itself. + // both Send() and ProcessMessageForDelivery() may be re-entrant. if (message->HasBrokerableAttachments()) { DCHECK(GetAttachmentBroker()); DCHECK(peer_pid_ != base::kNullProcessId); diff --git a/ipc/ipc_message.cc b/ipc/ipc_message.cc index 1df631ba59bcf4..2738bf13bc2540 100644 --- a/ipc/ipc_message.cc +++ b/ipc/ipc_message.cc @@ -84,6 +84,7 @@ Message::Message(const char* data, int data_len) Message::Message(const Message& other) : base::Pickle(other) { Init(); attachment_set_ = other.attachment_set_; + sender_pid_ = other.sender_pid_; } void Message::Init() { @@ -99,6 +100,7 @@ void Message::Init() { Message& Message::operator=(const Message& other) { *static_cast(this) = other; attachment_set_ = other.attachment_set_; + sender_pid_ = other.sender_pid_; return *this; }