Skip to content

Commit

Permalink
Merge "rpc resource accounting"
Browse files Browse the repository at this point in the history
"Provide a mechanism for rpc servers to limit resource usage.  When the limit
is exceeded, rpc will stop reading from the socket, blocking the clients too.

See scylladb/scylladb#596."
  • Loading branch information
avikivity committed Jan 14, 2016
2 parents b4cabcd + 909987c commit 6f9453d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
40 changes: 38 additions & 2 deletions rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,30 @@ struct SerializerConcept {

static constexpr char rpc_magic[] = "SSTARRPC";


/// \brief Resource limits for an RPC server
///
/// A request's memory use will be estimated as
///
/// req_mem = basic_request_size * sizeof(serialized_request) * bloat_factor
///
/// Concurrent requests will be limited so that
///
/// sum(req_mem) <= max_memory
///
/// \see server
struct resource_limits {
size_t basic_request_size = 0; ///< Minimum request footprint in memory
unsigned bloat_factor = 1; ///< Serialized size multiplied by this to estimate memory used by request
size_t max_memory = std::numeric_limits<size_t>::max(); ///< Maximum amount of memory that may be consumed by all requests
};

inline
size_t
estimate_request_size(const resource_limits& lim, size_t serialized_size) {
return lim.basic_request_size + serialized_size * lim.bloat_factor;
}

struct negotiation_frame {
char magic[sizeof(rpc_magic) - 1];
uint32_t required_features_mask;
Expand Down Expand Up @@ -114,20 +138,32 @@ public:
ipv4_addr peer_address() const {
return ipv4_addr(_info.addr);
}
future<> wait_for_resources(size_t memory_consumed) {
return _server._resources_available.wait(memory_consumed);
}
void release_resources(size_t memory_consumed) {
_server._resources_available.signal(memory_consumed);
}
size_t estimate_request_size(size_t serialized_size) {
return rpc::estimate_request_size(_server._limits, serialized_size);
}
};
private:
protocol& _proto;
server_socket _ss;
resource_limits _limits;
semaphore _resources_available;
std::unordered_set<connection*> _conns;
bool _stopping = false;
promise<> _ss_stopped;
public:
server(protocol& proto, ipv4_addr addr);
server(protocol& proto, server_socket);
server(protocol& proto, ipv4_addr addr, resource_limits memory_limit = resource_limits());
server(protocol& proto, server_socket, resource_limits memory_limit = resource_limits());
void accept();
future<> stop() {
_stopping = true; // prevents closed connections to be deleted from _conns
_ss.abort_accept();
_resources_available.broken();
return when_all(_ss_stopped.get_future(),
parallel_for_each(_conns, [] (connection* conn) {
return conn->stop();
Expand Down
29 changes: 20 additions & 9 deletions rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ protocol<Serializer, MsgType>::server::connection::respond(int64_t msg_id, sstri
}

template<typename Serializer, typename MsgType, typename... RetTypes>
inline void reply(wait_type, future<RetTypes...>&& ret, int64_t msg_id, lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client) {
inline void reply(wait_type, future<RetTypes...>&& ret, int64_t msg_id, lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client,
size_t memory_consumed) {
if (!client->error()) {
client->out_ready() = client->out_ready().then([&client = *client, msg_id, ret = std::move(ret)] () mutable {
sstring data;
Expand All @@ -431,13 +432,19 @@ inline void reply(wait_type, future<RetTypes...>&& ret, int64_t msg_id, lw_share
return client.respond(msg_id, std::move(data)).finally([&client]() {
client.get_stats_internal().pending--;
});
}).finally([client]{});
}).finally([client, memory_consumed] {
client->release_resources(memory_consumed);
});
} else {
client->release_resources(memory_consumed);
}
}

// specialization for no_wait_type which does not send a reply
template<typename Serializer, typename MsgType>
inline void reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client) {
inline void reply(no_wait_type, future<no_wait_type>&& r, int64_t msgid, lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client,
size_t memory_consumed) {
client->release_resources(memory_consumed);
try {
r.get();
} catch (std::exception& ex) {
Expand Down Expand Up @@ -475,11 +482,14 @@ auto recv_helper(signature<Ret (InArgs...)> sig, Func&& func, WantClientInfo wci
return [func = lref_to_cref(std::forward<Func>(func))](lw_shared_ptr<typename protocol<Serializer, MsgType>::server::connection> client,
int64_t msg_id,
temporary_buffer<char> data) mutable {
auto memory_consumed = client->estimate_request_size(data.size());
auto args = unmarshall<Serializer, InArgs...>(client->serializer(), std::move(data));
// note: apply is executed asynchronously with regards to networking so we cannot chain futures here by doing "return apply()"
apply(func, client->info(), WantClientInfo(), signature(), std::move(args)).then_wrapped(
[client, msg_id] (futurize_t<typename signature::ret_type> ret) mutable {
reply<Serializer, MsgType>(wait_style(), std::move(ret), msg_id, std::move(client));
return client->wait_for_resources(memory_consumed).then([client, msg_id, memory_consumed, args = std::move(args), func = std::forward<Func>(func)] () mutable {
apply(func, client->info(), WantClientInfo(), signature(), std::move(args)).then_wrapped(
[client, msg_id, memory_consumed] (futurize_t<typename signature::ret_type> ret) mutable {
reply<Serializer, MsgType>(wait_style(), std::move(ret), msg_id, std::move(client), memory_consumed);
});
});
};
}
Expand Down Expand Up @@ -586,12 +596,13 @@ auto protocol<Serializer, MsgType>::register_handler(MsgType t, Func&& func) {
}

template<typename Serializer, typename MsgType>
protocol<Serializer, MsgType>::server::server(protocol<Serializer, MsgType>& proto, ipv4_addr addr)
: server(proto, engine().listen(addr, listen_options(true)))
protocol<Serializer, MsgType>::server::server(protocol<Serializer, MsgType>& proto, ipv4_addr addr, resource_limits limits)
: server(proto, engine().listen(addr, listen_options(true)), limits)
{}

template<typename Serializer, typename MsgType>
protocol<Serializer, MsgType>::server::server(protocol<Serializer, MsgType>& proto, server_socket ss) : _proto(proto), _ss(std::move(ss))
protocol<Serializer, MsgType>::server::server(protocol<Serializer, MsgType>& proto, server_socket ss, resource_limits limits)
: _proto(proto), _ss(std::move(ss)), _limits(limits), _resources_available(limits.max_memory)
{
accept();
}
Expand Down
34 changes: 30 additions & 4 deletions tests/rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ inline double read(serializer, Input& input, rpc::type<double>) { return read_ar

template <typename Output>
inline void write(serializer, Output& out, const sstring& v) {
write(out, v.size());
write_arithmetic_type(out, uint32_t(v.size()));
out.write(v.c_str(), v.size());
}

template <typename Input>
inline sstring read(serializer, Input& in) {
auto size = read<size_t>(in);
inline sstring read(serializer, Input& in, rpc::type<sstring>) {
auto size = read_arithmetic_type<uint32_t>(in);
sstring ret(sstring::initialized_later(), size);
in.read(ret.begin(), size);
return ret;
Expand Down Expand Up @@ -117,6 +117,7 @@ int main(int ac, char** av) {
auto test10 = myrpc.make_client<long ()>(10); // receive less then replied
auto test10_1 = myrpc.make_client<future<long, int> ()>(10); // receive all
auto test11 = myrpc.make_client<future<long, rpc::optional<int>> ()>(11); // receive more then replied
auto test12 = myrpc.make_client<void (int sleep_ms, sstring payload)>(12); // large payload vs. server limits
auto test_nohandler = myrpc.make_client<void ()>(100000000); // non existing verb
auto test_nohandler_nowait = myrpc.make_client<rpc::no_wait_type ()>(100000000); // non existing verb, no_wait call

Expand Down Expand Up @@ -164,6 +165,22 @@ int main(int ac, char** av) {
});
test_nohandler_nowait(*client);
}
// delay a little for a time-sensitive test
sleep(400ms).then([test12] () mutable {
// server is configured for 10MB max, throw 25MB worth of requests at it.
auto now = std::chrono::steady_clock::now();
return parallel_for_each(boost::irange(0, 25), [test12, now] (int idx) mutable {
return test12(*client, 100, sstring(sstring::initialized_later(), 1'000'000)).then([idx, now] {
auto later = std::chrono::steady_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
print("idx %d completed after %d ms\n", idx, delta.count());
});
}).then([now] {
auto later = std::chrono::steady_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(later - now);
print("test12 completed after %d ms (should be ~300)\n", delta.count());
});
});
f.finally([] {
sleep(1s).then([] {
client->stop().then([] {
Expand Down Expand Up @@ -203,8 +220,17 @@ int main(int ac, char** av) {
print("test 11\n");
return 1ul;
});
myrpc.register_handler(12, [] (int sleep_ms, sstring payload) {
return sleep(std::chrono::milliseconds(sleep_ms)).then([] {
return make_ready_future<>();
});
});

server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, ipv4_addr{port});
rpc::resource_limits limits;
limits.bloat_factor = 1;
limits.basic_request_size = 0;
limits.max_memory = 10'000'000;
server = std::make_unique<rpc::protocol<serializer>::server>(myrpc, ipv4_addr{port}, limits);
}
});

Expand Down

0 comments on commit 6f9453d

Please sign in to comment.