Skip to content

Commit

Permalink
Mojo: Add basic quota API
Browse files Browse the repository at this point in the history
Introduces MojoSetQuota and MojoQueryQuota to support basic quota
management and signaling on Mojo primitives. Initially the only
quotas supported are receive queue length and memory size, only on
message pipe endpoints.

Various superfluous or overly specific test expectations around
signaling state have been adjusted to accomodate addition of a new
signal. Specifically, a lot of tests needlessly validate the exact
value of "satisfiable signals" on handles, when in reality either
those expectations already have sufficiently generic coverage
elsewhere, or the test really only cares about specific signals
being satisfiable or unsatisfiable.

Bug: 863078
Change-Id: I59217b5cfaceb790ceccd6e30443c4647c352c30
Reviewed-on: https://chromium-review.googlesource.com/1135449
Reviewed-by: Reilly Grant <reillyg@chromium.org>
Commit-Queue: Ken Rockot <rockot@chromium.org>
Cr-Commit-Position: refs/heads/master@{#574744}
  • Loading branch information
krockot authored and Commit Bot committed Jul 12, 2018
1 parent 4deabe2 commit 0db0bcf
Show file tree
Hide file tree
Showing 34 changed files with 732 additions and 68 deletions.
1 change: 1 addition & 0 deletions mojo/core/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ source_set("test_sources") {
"message_unittest.cc",
"options_validation_unittest.cc",
"platform_handle_dispatcher_unittest.cc",
"quota_unittest.cc",
"shared_buffer_dispatcher_unittest.cc",
"shared_buffer_unittest.cc",
"signals_unittest.cc",
Expand Down
28 changes: 28 additions & 0 deletions mojo/core/core.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,34 @@ MojoResult Core::AcceptInvitation(
return MOJO_RESULT_OK;
}

MojoResult Core::SetQuota(MojoHandle handle,
MojoQuotaType type,
uint64_t limit,
const MojoSetQuotaOptions* options) {
RequestContext request_context;
if (options && options->struct_size < sizeof(*options))
return MOJO_RESULT_INVALID_ARGUMENT;
auto dispatcher = GetDispatcher(handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;

return dispatcher->SetQuota(type, limit);
}

MojoResult Core::QueryQuota(MojoHandle handle,
MojoQuotaType type,
const MojoQueryQuotaOptions* options,
uint64_t* limit,
uint64_t* usage) {
RequestContext request_context;
if (options && options->struct_size < sizeof(*options))
return MOJO_RESULT_INVALID_ARGUMENT;
auto dispatcher = GetDispatcher(handle);
if (!dispatcher)
return MOJO_RESULT_INVALID_ARGUMENT;
return dispatcher->QueryQuota(type, limit, usage);
}

void Core::GetActiveHandlesForTest(std::vector<MojoHandle>* handles) {
base::AutoLock lock(handles_->GetLock());
handles_->GetActiveHandlesForTest(handles);
Expand Down
12 changes: 12 additions & 0 deletions mojo/core/core.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "mojo/public/c/system/invitation.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/c/system/platform_handle.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/c/system/trap.h"
#include "mojo/public/c/system/types.h"

Expand Down Expand Up @@ -330,6 +331,17 @@ class MOJO_SYSTEM_IMPL_EXPORT Core {
const MojoAcceptInvitationOptions* options,
MojoHandle* invitation_handle);

// Quota API.
MojoResult SetQuota(MojoHandle handle,
MojoQuotaType type,
uint64_t limit,
const MojoSetQuotaOptions* options);
MojoResult QueryQuota(MojoHandle handle,
MojoQuotaType type,
const MojoQueryQuotaOptions* options,
uint64_t* limit,
uint64_t* usage);

void GetActiveHandlesForTest(std::vector<MojoHandle>* handles);

private:
Expand Down
11 changes: 5 additions & 6 deletions mojo/core/core_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ const MojoHandleSignalsState kEmptyMojoHandleSignalsState = {0u, 0u};
const MojoHandleSignalsState kFullMojoHandleSignalsState = {~0u, ~0u};
const MojoHandleSignals kAllSignals =
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE;
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE |
MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;

using CoreTest = test::CoreTestBase;

Expand Down Expand Up @@ -209,14 +210,12 @@ TEST_F(CoreTest, MessagePipe) {
// Check that |h[1]| is no longer writable (and will never be).
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss[1].satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss[1].satisfiable_signals);
EXPECT_FALSE(hss[1].satisfiable_signals & MOJO_HANDLE_SIGNAL_WRITABLE);

// Check that |h[1]| is still readable (for the moment).
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss[1].satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss[1].satisfiable_signals);
EXPECT_TRUE(hss[1].satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);

// Discard a message from |h[1]|.
ASSERT_EQ(MOJO_RESULT_OK, core()->ReadMessage(h[1], nullptr, &message));
Expand All @@ -226,7 +225,7 @@ TEST_F(CoreTest, MessagePipe) {
hss[1] = kFullMojoHandleSignalsState;
EXPECT_EQ(MOJO_RESULT_OK, core()->QueryHandleSignalsState(h[1], &hss[1]));
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss[1].satisfied_signals);
EXPECT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss[1].satisfiable_signals);
EXPECT_FALSE(hss[1].satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);

// Try writing to |h[1]|.
ASSERT_EQ(MOJO_RESULT_OK, core()->CreateMessage(nullptr, &message));
Expand Down
10 changes: 10 additions & 0 deletions mojo/core/dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ MojoResult Dispatcher::ExtractMessagePipe(base::StringPiece name,
return MOJO_RESULT_INVALID_ARGUMENT;
}

MojoResult Dispatcher::SetQuota(MojoQuotaType type, uint64_t limit) {
return MOJO_RESULT_INVALID_ARGUMENT;
}

MojoResult Dispatcher::QueryQuota(MojoQuotaType type,
uint64_t* limit,
uint64_t* usage) {
return MOJO_RESULT_INVALID_ARGUMENT;
}

HandleSignalsState Dispatcher::GetHandleSignalsState() const {
return HandleSignalsState();
}
Expand Down
7 changes: 7 additions & 0 deletions mojo/core/dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "mojo/public/c/system/buffer.h"
#include "mojo/public/c/system/data_pipe.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/c/system/quota.h"
#include "mojo/public/c/system/trap.h"
#include "mojo/public/c/system/types.h"
#include "mojo/public/cpp/platform/platform_handle.h"
Expand Down Expand Up @@ -133,6 +134,12 @@ class MOJO_SYSTEM_IMPL_EXPORT Dispatcher
virtual MojoResult ExtractMessagePipe(base::StringPiece name,
MojoHandle* message_pipe_handle);

// Quota API.
virtual MojoResult SetQuota(MojoQuotaType type, uint64_t limit);
virtual MojoResult QueryQuota(MojoQuotaType type,
uint64_t* limit,
uint64_t* usage);

///////////// General-purpose API for all handle types /////////

// Gets the current handle signals state. (The default implementation simply
Expand Down
6 changes: 4 additions & 2 deletions mojo/core/embedder_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ TEST_F(EmbedderTest, MultiprocessChannels) {
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(mp2, MOJO_HANDLE_SIGNAL_READABLE, &state));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfiable_signals);
ASSERT_FALSE(state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
ASSERT_FALSE(state.satisfiable_signals & MOJO_HANDLE_SIGNAL_WRITABLE);

ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp2));
});
Expand Down Expand Up @@ -264,7 +265,8 @@ DEFINE_TEST_CLIENT_TEST_WITH_PIPE(MultiprocessChannelsClient,
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(mp1, MOJO_HANDLE_SIGNAL_READABLE, &state));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, state.satisfiable_signals);
ASSERT_FALSE(state.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
ASSERT_FALSE(state.satisfiable_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
ASSERT_EQ(MOJO_RESULT_OK, MojoClose(mp1));
}

Expand Down
21 changes: 20 additions & 1 deletion mojo/core/entrypoints.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "mojo/public/c/system/functions.h"
#include "mojo/public/c/system/message_pipe.h"
#include "mojo/public/c/system/platform_handle.h"
#include "mojo/public/c/system/quota.h"

namespace {

Expand Down Expand Up @@ -328,6 +329,22 @@ MojoResult MojoAcceptInvitationImpl(
invitation_handle);
}

MojoResult MojoSetQuotaImpl(MojoHandle handle,
MojoQuotaType type,
uint64_t limit,
const MojoSetQuotaOptions* options) {
return g_core->SetQuota(handle, type, limit, options);
}

MojoResult MojoQueryQuotaImpl(MojoHandle handle,
MojoQuotaType type,
const MojoQueryQuotaOptions* options,
uint64_t* current_limit,
uint64_t* current_usage) {
return g_core->QueryQuota(handle, type, options, current_limit,
current_usage);
}

} // extern "C"

MojoSystemThunks g_thunks = {sizeof(MojoSystemThunks),
Expand Down Expand Up @@ -371,7 +388,9 @@ MojoSystemThunks g_thunks = {sizeof(MojoSystemThunks),
MojoAttachMessagePipeToInvitationImpl,
MojoExtractMessagePipeFromInvitationImpl,
MojoSendInvitationImpl,
MojoAcceptInvitationImpl};
MojoAcceptInvitationImpl,
MojoSetQuotaImpl,
MojoQueryQuotaImpl};

} // namespace

Expand Down
5 changes: 5 additions & 0 deletions mojo/core/handle_signals_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ struct MOJO_SYSTEM_IMPL_EXPORT HandleSignalsState final
return satisfies_any(MOJO_HANDLE_SIGNAL_PEER_REMOTE);
}

// Indicates whether the handle has exceeded some quota limit.
bool quota_exceeded() const {
return satisfies_any(MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED);
}

// The handle will never be |readable()| again.
bool never_readable() const {
return !can_satisfy_any(MOJO_HANDLE_SIGNAL_READABLE);
Expand Down
60 changes: 59 additions & 1 deletion mojo/core/message_pipe_dispatcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,56 @@ MojoResult MessagePipeDispatcher::ReadMessage(
return MOJO_RESULT_OK;
}

MojoResult MessagePipeDispatcher::SetQuota(MojoQuotaType type, uint64_t limit) {
switch (type) {
case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
if (limit == MOJO_QUOTA_LIMIT_NONE)
receive_queue_length_limit_.reset();
else
receive_queue_length_limit_ = limit;
break;

case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
if (limit == MOJO_QUOTA_LIMIT_NONE)
receive_queue_memory_size_limit_.reset();
else
receive_queue_memory_size_limit_ = limit;
break;

default:
return MOJO_RESULT_INVALID_ARGUMENT;
}

return MOJO_RESULT_OK;
}

MojoResult MessagePipeDispatcher::QueryQuota(MojoQuotaType type,
uint64_t* limit,
uint64_t* usage) {
ports::PortStatus port_status;
if (node_controller_->node()->GetStatus(port_, &port_status) != ports::OK) {
CHECK(in_transit_ || port_transferred_ || port_closed_);
return MOJO_RESULT_INVALID_ARGUMENT;
}

switch (type) {
case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_LENGTH:
*limit = receive_queue_length_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
*usage = port_status.queued_message_count;
break;

case MOJO_QUOTA_TYPE_RECEIVE_QUEUE_MEMORY_SIZE:
*limit = receive_queue_memory_size_limit_.value_or(MOJO_QUOTA_LIMIT_NONE);
*usage = port_status.queued_num_bytes;
break;

default:
return MOJO_RESULT_INVALID_ARGUMENT;
}

return MOJO_RESULT_OK;
}

HandleSignalsState MessagePipeDispatcher::GetHandleSignalsState() const {
base::AutoLock lock(signal_lock_);
return GetHandleSignalsStateNoLock();
Expand Down Expand Up @@ -328,7 +378,15 @@ HandleSignalsState MessagePipeDispatcher::GetHandleSignalsStateNoLock() const {
} else {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
}
rv.satisfiable_signals |= MOJO_HANDLE_SIGNAL_PEER_CLOSED;
if (receive_queue_length_limit_ &&
port_status.queued_message_count > *receive_queue_length_limit_) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
} else if (receive_queue_memory_size_limit_ &&
port_status.queued_num_bytes > *receive_queue_memory_size_limit_) {
rv.satisfied_signals |= MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
}
rv.satisfiable_signals |=
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;
return rv;
}

Expand Down
7 changes: 7 additions & 0 deletions mojo/core/message_pipe_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <queue>

#include "base/macros.h"
#include "base/optional.h"
#include "mojo/core/atomic_flag.h"
#include "mojo/core/dispatcher.h"
#include "mojo/core/ports/port_ref.h"
Expand Down Expand Up @@ -51,6 +52,10 @@ class MessagePipeDispatcher : public Dispatcher {
std::unique_ptr<ports::UserMessageEvent> message) override;
MojoResult ReadMessage(
std::unique_ptr<ports::UserMessageEvent>* message) override;
MojoResult SetQuota(MojoQuotaType type, uint64_t limit) override;
MojoResult QueryQuota(MojoQuotaType type,
uint64_t* limit,
uint64_t* usage) override;
HandleSignalsState GetHandleSignalsState() const override;
MojoResult AddWatcherRef(const scoped_refptr<WatcherDispatcher>& watcher,
uintptr_t context) override;
Expand Down Expand Up @@ -99,6 +104,8 @@ class MessagePipeDispatcher : public Dispatcher {
bool port_transferred_ = false;
AtomicFlag port_closed_;
WatcherSet watchers_;
base::Optional<uint64_t> receive_queue_length_limit_;
base::Optional<uint64_t> receive_queue_memory_size_limit_;

DISALLOW_COPY_AND_ASSIGN(MessagePipeDispatcher);
};
Expand Down
26 changes: 11 additions & 15 deletions mojo/core/message_pipe_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ namespace {

const MojoHandleSignals kAllSignals =
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_WRITABLE |
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE;
MOJO_HANDLE_SIGNAL_PEER_CLOSED | MOJO_HANDLE_SIGNAL_PEER_REMOTE |
MOJO_HANDLE_SIGNAL_QUOTA_EXCEEDED;

static const char kHelloWorld[] = "hello world";

Expand Down Expand Up @@ -279,29 +280,23 @@ TEST_F(MessagePipeTest, BasicWaiting) {
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(pipe1_, MOJO_HANDLE_SIGNAL_PEER_CLOSED, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
ASSERT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);
ASSERT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_PEER_CLOSED);

// Port 1 should not be writable.
// Port 1 should not be writable now or ever again.
hss = MojoHandleSignalsState();

ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(pipe1_, MOJO_HANDLE_SIGNAL_WRITABLE, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
ASSERT_FALSE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
ASSERT_FALSE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_WRITABLE);

// But it should still be readable.
hss = MojoHandleSignalsState();
ASSERT_EQ(MOJO_RESULT_OK,
WaitForSignals(pipe1_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
hss.satisfiable_signals);
ASSERT_TRUE(hss.satisfied_signals & MOJO_HANDLE_SIGNAL_READABLE);
ASSERT_TRUE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);

// Read from port 1.
buffer[0] = 0;
Expand All @@ -314,7 +309,8 @@ TEST_F(MessagePipeTest, BasicWaiting) {
ASSERT_EQ(MOJO_RESULT_FAILED_PRECONDITION,
WaitForSignals(pipe1_, MOJO_HANDLE_SIGNAL_READABLE, &hss));
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfied_signals);
ASSERT_EQ(MOJO_HANDLE_SIGNAL_PEER_CLOSED, hss.satisfiable_signals);
ASSERT_FALSE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_READABLE);
ASSERT_FALSE(hss.satisfiable_signals & MOJO_HANDLE_SIGNAL_WRITABLE);
}

#if !defined(OS_IOS)
Expand Down
Loading

0 comments on commit 0db0bcf

Please sign in to comment.