Skip to content

Commit

Permalink
ipc: Allow attachment broker messages to be processed out of order.
Browse files Browse the repository at this point in the history
There are no ordering restrictions on messages destined for the attachment
broker, and the message may be needed to dispatch messages waiting in
|message_queue_|.

This CL introduces a small behavioral change ot ChannelReader - messages for the
attachment broker are processed immediately. This CL also includes a small
refactor of ChannelReader to split the function TranslateInputData into several,
smaller, better documented functions.

BUG=535711

Review URL: https://codereview.chromium.org/1421473004

Cr-Commit-Position: refs/heads/master@{#355411}
  • Loading branch information
erikchen authored and Commit bot committed Oct 21, 2015
1 parent aa0e86b commit 24e44d3
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 85 deletions.
78 changes: 70 additions & 8 deletions ipc/attachment_broker_mac_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,23 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase {
broker_.reset(broker);
}

// Mach Setup that needs to occur before child processes are forked.
void MachPreForkSetUp() {
service_name_ = IPC::CreateRandomServiceName();
server_port_.reset(IPC::BecomeMachServer(service_name_.c_str()).release());
}

// Mach Setup that needs to occur after child processes are forked.
void MachPostForkSetUp() {
client_port_.reset(IPC::ReceiveMachPort(server_port_.get()).release());
IPC::SendMachPort(
client_port_.get(), mach_task_self(), MACH_MSG_TYPE_COPY_SEND);
}

// Setup shared between tests.
void CommonSetUp(const char* name) {
Init(name);
service_name_ = IPC::CreateRandomServiceName();
server_port_.reset(IPC::BecomeMachServer(service_name_.c_str()).release());
MachPreForkSetUp();

if (!broker_.get())
SetBroker(new IPC::AttachmentBrokerUnprivilegedMac);
Expand All @@ -383,9 +395,7 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase {
ASSERT_TRUE(ConnectChannel());
ASSERT_TRUE(StartClient());

client_port_.reset(IPC::ReceiveMachPort(server_port_.get()).release());
IPC::SendMachPort(
client_port_.get(), mach_task_self(), MACH_MSG_TYPE_COPY_SEND);
MachPostForkSetUp();
active_names_at_start_ = IPC::GetActiveNameCount();
get_proxy_listener()->set_listener(&result_listener_);
}
Expand Down Expand Up @@ -428,6 +438,10 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase {
AttachmentBrokerObserver* get_observer() { return &observer_; }
ResultListener* get_result_listener() { return &result_listener_; }

protected:
// The number of active names immediately after set up.
mach_msg_type_number_t active_names_at_start_;

private:
ProxyListener proxy_listener_;
scoped_ptr<IPC::AttachmentBrokerUnprivilegedMac> broker_;
Expand All @@ -441,9 +455,6 @@ class IPCAttachmentBrokerMacTest : public IPCTestBase {
// process.
base::mac::ScopedMachSendRight client_port_;

// The number of active names immediately after set up.
mach_msg_type_number_t active_names_at_start_;

std::string service_name_;

ResultListener result_listener_;
Expand Down Expand Up @@ -837,4 +848,55 @@ MULTIPROCESS_IPC_TEST_CLIENT_MAIN(SendSharedMemoryHandleToSelf) {
"SendSharedMemoryHandleToSelf");
}

// Similar to SendSharedMemoryHandle, but uses a ChannelProxy instead of a
// Channel.
TEST_F(IPCAttachmentBrokerMacTest, SendSharedMemoryHandleChannelProxy) {
Init("SendSharedMemoryHandleChannelProxy");
MachPreForkSetUp();

SetBroker(new IPC::AttachmentBrokerUnprivilegedMac);
get_broker()->AddObserver(get_observer());

scoped_ptr<base::Thread> thread(
new base::Thread("ChannelProxyTestServerThread"));
base::Thread::Options options;
options.message_loop_type = base::MessageLoop::TYPE_IO;
thread->StartWithOptions(options);

CreateChannelProxy(get_proxy_listener(), thread->task_runner().get());
get_broker()->DesignateBrokerCommunicationChannel(channel_proxy());

ASSERT_TRUE(StartClient());

MachPostForkSetUp();
active_names_at_start_ = IPC::GetActiveNameCount();
get_proxy_listener()->set_listener(get_result_listener());

SendMessage1(kDataBuffer1);
base::MessageLoop::current()->Run();

CheckChildResult();

// There should be no leaked names.
EXPECT_EQ(active_names_at_start_, IPC::GetActiveNameCount());

// Close the channel so the client's OnChannelError() gets fired.
channel_proxy()->Close();

EXPECT_TRUE(WaitForClientShutdown());
DestroyChannelProxy();
}

void SendSharedMemoryHandleChannelProxyCallback(IPC::Sender* sender,
const IPC::Message& message) {
bool success = CheckContentsOfMessage1(message, kDataBuffer1);
SendControlMessage(sender, success);
}

MULTIPROCESS_IPC_TEST_CLIENT_MAIN(SendSharedMemoryHandleChannelProxy) {
return CommonPrivilegedProcessMain(
&SendSharedMemoryHandleChannelProxyCallback,
"SendSharedMemoryHandleChannelProxy");
}

} // namespace
3 changes: 1 addition & 2 deletions ipc/ipc_channel_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,7 @@ void ChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {

bool ChannelPosix::ProcessMessageForDelivery(Message* message) {
// Sending a brokerable attachment requires a call to Channel::Send(), so
// Send() may be re-entrant. Brokered attachments must be sent before the
// Message itself.
// Send() may be re-entrant.
if (message->HasBrokerableAttachments()) {
DCHECK(GetAttachmentBroker());
DCHECK(peer_pid_ != base::kNullProcessId);
Expand Down
163 changes: 98 additions & 65 deletions ipc/ipc_channel_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ void ChannelReader::CleanUp() {
}
}

void ChannelReader::DispatchMessage(Message* m) {
EmitLogBeforeDispatch(*m);
listener_->OnMessageReceived(*m);
HandleDispatchError(*m);
}

bool ChannelReader::TranslateInputData(const char* input_data,
int input_data_len) {
const char* p;
Expand Down Expand Up @@ -96,37 +102,9 @@ bool ChannelReader::TranslateInputData(const char* input_data,
int pickle_len = static_cast<int>(info.pickle_end - p);
Message translated_message(p, pickle_len);

UMA_HISTOGRAM_MEMORY_KB(
"Memory.IPCChannelReader.ReceivedMessageSize",
static_cast<base::HistogramBase::Sample>(translated_message.size()));

for (const auto& id : info.attachment_ids)
translated_message.AddPlaceholderBrokerableAttachmentWithId(id);

if (!GetNonBrokeredAttachments(&translated_message))
if (!HandleTranslatedMessage(&translated_message, info.attachment_ids))
return false;

// If there are no queued messages, attempt to immediately dispatch the
// newly translated message.
if (queued_messages_.empty()) {
DCHECK(blocked_ids_.empty());
AttachmentIdSet blocked_ids =
GetBrokeredAttachments(&translated_message);

if (blocked_ids.empty()) {
// Dispatch the message and continue the loop.
DispatchMessage(&translated_message);
p = info.message_end;
continue;
}

blocked_ids_.swap(blocked_ids);
StartObservingAttachmentBroker();
}

// Make a deep copy of |translated_message| to add to the queue.
scoped_ptr<Message> m(new Message(translated_message));
queued_messages_.push_back(m.release());
p = info.message_end;
} else {
// Last message is partial.
Expand Down Expand Up @@ -156,6 +134,96 @@ bool ChannelReader::TranslateInputData(const char* input_data,
return true;
}

bool ChannelReader::HandleTranslatedMessage(
Message* translated_message,
const AttachmentIdVector& attachment_ids) {
UMA_HISTOGRAM_MEMORY_KB(
"Memory.IPCChannelReader.ReceivedMessageSize",
static_cast<base::HistogramBase::Sample>(translated_message->size()));

// Immediately handle internal messages.
if (IsInternalMessage(*translated_message)) {
EmitLogBeforeDispatch(*translated_message);
HandleInternalMessage(*translated_message);
HandleDispatchError(*translated_message);
return true;
}

translated_message->set_sender_pid(GetSenderPID());

// Immediately handle attachment broker messages.
if (DispatchAttachmentBrokerMessage(*translated_message)) {
// Ideally, the log would have been emitted prior to dispatching the
// message, but that would require this class to know more about the
// internals of attachment brokering, which should be avoided.
EmitLogBeforeDispatch(*translated_message);
HandleDispatchError(*translated_message);
return true;
}

return HandleExternalMessage(translated_message, attachment_ids);
}

bool ChannelReader::HandleExternalMessage(
Message* external_message,
const AttachmentIdVector& attachment_ids) {
for (const auto& id : attachment_ids)
external_message->AddPlaceholderBrokerableAttachmentWithId(id);

if (!GetNonBrokeredAttachments(external_message))
return false;

// If there are no queued messages, attempt to immediately dispatch the
// newly translated message.
if (queued_messages_.empty()) {
DCHECK(blocked_ids_.empty());
AttachmentIdSet blocked_ids = GetBrokeredAttachments(external_message);

if (blocked_ids.empty()) {
DispatchMessage(external_message);
return true;
}

blocked_ids_.swap(blocked_ids);
StartObservingAttachmentBroker();
}

// Make a deep copy of |external_message| to add to the queue.
scoped_ptr<Message> m(new Message(*external_message));
queued_messages_.push_back(m.release());
return true;
}

void ChannelReader::HandleDispatchError(const Message& message) {
if (message.dispatch_error())
listener_->OnBadMessageReceived(message);
}

void ChannelReader::EmitLogBeforeDispatch(const Message& message) {
#ifdef IPC_MESSAGE_LOG_ENABLED
std::string name;
Logging::GetInstance()->GetMessageText(message.type(), &name, &message, NULL);
TRACE_EVENT_WITH_FLOW1("ipc,toplevel", "ChannelReader::DispatchInputData",
message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "name",
name);
#else
TRACE_EVENT_WITH_FLOW2("ipc,toplevel", "ChannelReader::DispatchInputData",
message.flags(), TRACE_EVENT_FLAG_FLOW_IN, "class",
IPC_MESSAGE_ID_CLASS(message.type()), "line",
IPC_MESSAGE_ID_LINE(message.type()));
#endif
}

bool ChannelReader::DispatchAttachmentBrokerMessage(const Message& message) {
#if USE_ATTACHMENT_BROKER
if (IsAttachmentBrokerEndpoint() && GetAttachmentBroker()) {
return GetAttachmentBroker()->OnMessageReceived(message);
}
#endif // USE_ATTACHMENT_BROKER

return false;
}

ChannelReader::DispatchState ChannelReader::DispatchMessages() {
while (!queued_messages_.empty()) {
if (!blocked_ids_.empty())
Expand All @@ -176,42 +244,6 @@ ChannelReader::DispatchState ChannelReader::DispatchMessages() {
return DISPATCH_FINISHED;
}

void ChannelReader::DispatchMessage(Message* m) {
m->set_sender_pid(GetSenderPID());

#ifdef IPC_MESSAGE_LOG_ENABLED
std::string name;
Logging::GetInstance()->GetMessageText(m->type(), &name, m, NULL);
TRACE_EVENT_WITH_FLOW1("ipc,toplevel",
"ChannelReader::DispatchInputData",
m->flags(),
TRACE_EVENT_FLAG_FLOW_IN,
"name", name);
#else
TRACE_EVENT_WITH_FLOW2("ipc,toplevel",
"ChannelReader::DispatchInputData",
m->flags(),
TRACE_EVENT_FLAG_FLOW_IN,
"class", IPC_MESSAGE_ID_CLASS(m->type()),
"line", IPC_MESSAGE_ID_LINE(m->type()));
#endif

bool handled = false;
if (IsInternalMessage(*m)) {
HandleInternalMessage(*m);
handled = true;
}
#if USE_ATTACHMENT_BROKER
if (!handled && IsAttachmentBrokerEndpoint() && GetAttachmentBroker()) {
handled = GetAttachmentBroker()->OnMessageReceived(*m);
}
#endif // USE_ATTACHMENT_BROKER
if (!handled)
listener_->OnMessageReceived(*m);
if (m->dispatch_error())
listener_->OnBadMessageReceived(*m);
}

ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
Message* msg) {
std::set<BrokerableAttachment::AttachmentId> blocked_ids;
Expand All @@ -223,6 +255,7 @@ ChannelReader::AttachmentIdSet ChannelReader::GetBrokeredAttachments(
for (const BrokerableAttachment* attachment : brokerable_attachments_copy) {
if (attachment->NeedsBrokering()) {
AttachmentBroker* broker = GetAttachmentBroker();
DCHECK(broker);
scoped_refptr<BrokerableAttachment> brokered_attachment;
bool result = broker->GetAttachmentWithId(attachment->GetIdentifier(),
&brokered_attachment);
Expand Down
40 changes: 32 additions & 8 deletions ipc/ipc_channel_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,40 @@ class IPC_EXPORT ChannelReader : public SupportsAttachmentBrokering,
FRIEND_TEST_ALL_PREFIXES(ChannelReaderTest, ResizeOverflowBuffer);
FRIEND_TEST_ALL_PREFIXES(ChannelReaderTest, InvalidMessageSize);

typedef std::set<BrokerableAttachment::AttachmentId> AttachmentIdSet;

// Takes the given data received from the IPC channel, translates it into
// Messages, and puts them in queued_messages_.
// As an optimization, after a message is translated, the message is
// immediately dispatched if able. This prevents an otherwise unnecessary deep
// copy of the message which is needed to store the message in the message
// queue.
using AttachmentIdSet = std::set<BrokerableAttachment::AttachmentId>;
using AttachmentIdVector = std::vector<BrokerableAttachment::AttachmentId>;

// Takes the data received from the IPC channel and translates it into
// Messages. Complete messages are passed to HandleTranslatedMessage().
// Returns |false| on unrecoverable error.
bool TranslateInputData(const char* input_data, int input_data_len);

// Internal messages and messages bound for the attachment broker are
// immediately dispatched. Other messages are passed to
// HandleExternalMessage().
// Returns |false| on unrecoverable error.
bool HandleTranslatedMessage(Message* translated_message,
const AttachmentIdVector& attachment_ids);

// Populates the message with brokered and non-brokered attachments. If
// possible, the message is immediately dispatched. Otherwise, a deep copy of
// the message is added to |queued_messages_|. |blocked_ids_| are updated if
// necessary.
bool HandleExternalMessage(Message* external_message,
const AttachmentIdVector& attachment_ids);

// If there was a dispatch error, informs |listener_|.
void HandleDispatchError(const Message& message);

// Emits logging associated with a Message that is about to be dispatched.
void EmitLogBeforeDispatch(const Message& message);

// Attachment broker messages should be dispatched out of band, since there
// are no ordering restrictions on them, and they may be required to dispatch
// the messages waiting in |queued_messages_|.
// Returns true if the attachment broker handled |message|.
bool DispatchAttachmentBrokerMessage(const Message& message);

// Dispatches messages from queued_messages_ to listeners. Successfully
// dispatched messages are removed from queued_messages_.
DispatchState DispatchMessages();
Expand Down
3 changes: 1 addition & 2 deletions ipc/ipc_channel_win.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ bool ChannelWin::Send(Message* message) {

bool ChannelWin::ProcessMessageForDelivery(Message* message) {
// Sending a brokerable attachment requires a call to Channel::Send(), so
// both Send() and ProcessMessageForDelivery() may be re-entrant. Brokered
// attachments must be sent before the Message itself.
// both Send() and ProcessMessageForDelivery() may be re-entrant.
if (message->HasBrokerableAttachments()) {
DCHECK(GetAttachmentBroker());
DCHECK(peer_pid_ != base::kNullProcessId);
Expand Down
2 changes: 2 additions & 0 deletions ipc/ipc_message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Message::Message(const char* data, int data_len)
Message::Message(const Message& other) : base::Pickle(other) {
Init();
attachment_set_ = other.attachment_set_;
sender_pid_ = other.sender_pid_;
}

void Message::Init() {
Expand All @@ -99,6 +100,7 @@ void Message::Init() {
Message& Message::operator=(const Message& other) {
*static_cast<base::Pickle*>(this) = other;
attachment_set_ = other.attachment_set_;
sender_pid_ = other.sender_pid_;
return *this;
}

Expand Down

0 comments on commit 24e44d3

Please sign in to comment.