Skip to content

Commit

Permalink
Merge "Various improvements to execution stages" from Paweł
Browse files Browse the repository at this point in the history
"These patches add individual metrics for execution stages, fixes problems
with creation of execution stages and adds more tests."

* tag 'pdziepak/execution-stages-improvements/v4' of github.com:cloudius-systems/seastar-dev:
  tests/execution_stage: test that unique stage name is required
  test/execution_stage: add more tests for creating stages
  execution_stage: fix stage creation from lreference to function object
  tests/execution_stage: add test for metrics
  execution_stages: add metrics
  metrics_registration: make metric_groups movable
  • Loading branch information
avikivity committed Mar 25, 2017
2 parents f8fa1f3 + 123fc80 commit 2ebe842
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 18 deletions.
95 changes: 84 additions & 11 deletions core/execution_stage.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "future.hh"
#include "chunked_fifo.hh"
#include "function_traits.hh"
#include "sstring.hh"
#include "metrics.hh"
#include "util/reference_wrapper.hh"
#include "util/gcc6-concepts.hh"

Expand Down Expand Up @@ -107,13 +109,23 @@ std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {

/// Base execution stage class
class execution_stage {
public:
struct stats {
uint64_t tasks_scheduled = 0;
uint64_t tasks_preempted = 0;
uint64_t function_calls_enqueued = 0;
uint64_t function_calls_executed = 0;
};
protected:
bool _empty = true;
bool _flush_scheduled = false;
stats _stats;
sstring _name;
metrics::metric_group _metric_group;
protected:
virtual void do_flush() noexcept = 0;
public:
execution_stage();
explicit execution_stage(const sstring& name);
virtual ~execution_stage();

execution_stage(const execution_stage&) = delete;
Expand All @@ -126,6 +138,12 @@ public:
/// optimisation which is required by make_execution_stage().
execution_stage(execution_stage&&);

/// Returns execution stage name
const sstring& name() const noexcept { return _name; }

/// Returns execution stage usage statistics
const stats& get_stats() const noexcept { return _stats; }

/// Flushes execution stage
///
/// Ensures that a task which would execute all queued operations is
Expand All @@ -137,6 +155,7 @@ public:
if (_empty || _flush_scheduled) {
return false;
}
_stats.tasks_scheduled++;
schedule(make_task([this] {
do_flush();
_flush_scheduled = false;
Expand All @@ -158,21 +177,37 @@ namespace internal {

class execution_stage_manager {
std::vector<execution_stage*> _execution_stages;
std::unordered_map<sstring, execution_stage*> _stages_by_name;
private:
execution_stage_manager() = default;
execution_stage_manager(const execution_stage_manager&) = delete;
execution_stage_manager(execution_stage_manager&&) = delete;
public:
void register_execution_stage(execution_stage& stage) {
_execution_stages.push_back(&stage);
auto ret = _stages_by_name.emplace(stage.name(), &stage);
if (!ret.second) {
throw std::invalid_argument(sprint("Execution stage %s already exists.", stage.name()));
}
try {
_execution_stages.push_back(&stage);
} catch (...) {
_stages_by_name.erase(stage.name());
throw;
}
}
void unregister_execution_stage(execution_stage& stage) noexcept {
auto it = std::find(_execution_stages.begin(), _execution_stages.end(), &stage);
_execution_stages.erase(it);
_stages_by_name.erase(stage.name());
}
void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept {
auto it = std::find(_execution_stages.begin(), _execution_stages.end(), &old_es);
*it = &new_es;
_stages_by_name.find(new_es.name())->second = &new_es;
}

execution_stage* get_stage(const sstring& name) {
return _stages_by_name[name];
}

bool flush() noexcept {
Expand Down Expand Up @@ -247,15 +282,20 @@ private:
auto& wi = _queue.front();
futurize<ReturnType>::apply(_function, unwrap(std::move(wi._in))).forward_to(std::move(wi._ready));
_queue.pop_front();
_stats.function_calls_executed++;

if (need_preempt()) {
_stats.tasks_preempted++;
break;
}
}
_empty = _queue.empty();
}
public:
explicit concrete_execution_stage(Function f) : _function(std::move(f)) {
explicit concrete_execution_stage(const sstring& name, Function f)
: execution_stage(name)
, _function(std::move(f))
{
_queue.reserve(flush_threshold);
}

Expand Down Expand Up @@ -285,6 +325,7 @@ public:
return_type operator()(Args&&... args) {
_queue.emplace_back(std::forward<Args>(args)...);
_empty = false;
_stats.function_calls_enqueued++;
auto f = _queue.back()._ready.get_future();
if (_queue.size() > flush_threshold) {
flush();
Expand Down Expand Up @@ -319,13 +360,14 @@ public:
/// }
/// ```
///
/// \param name unique name of the execution stage
/// \param fn function to be executed by the stage
/// \return concrete_execution_stage
template<typename Function>
auto make_execution_stage(Function&& fn) {
auto make_execution_stage(const sstring& name, Function&& fn) {
using traits = function_traits<Function>;
return concrete_execution_stage<Function, typename traits::return_type,
typename traits::args_as_tuple>(std::forward<Function>(fn));
return concrete_execution_stage<std::decay_t<Function>, typename traits::return_type,
typename traits::args_as_tuple>(name, std::forward<Function>(fn));
}

/// Creates a new execution stage from a member function
Expand All @@ -348,21 +390,49 @@ auto make_execution_stage(Function&& fn) {
/// ```
///
/// \see make_execution_stage(Function&&)
/// \param name unique name of the execution stage
/// \param fn member function to be executed by the stage
/// \return concrete_execution_stage
template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(Ret (Object::*fn)(Args...)) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<Object*, Args...>>(std::mem_fn(fn));
auto make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<Object*, Args...>>(name, std::mem_fn(fn));
}

template<typename Ret, typename Object, typename... Args>
auto make_execution_stage(Ret (Object::*fn)(Args...) const) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<const Object*, Args...>>(std::mem_fn(fn));
auto make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
return concrete_execution_stage<decltype(std::mem_fn(fn)), Ret, std::tuple<const Object*, Args...>>(name, std::mem_fn(fn));
}

/// @}

inline execution_stage::execution_stage()
inline execution_stage::execution_stage(const sstring& name)
: _name(name)
, _metric_group("execution_stages", {
metrics::make_derive("tasks_scheduled",
metrics::description("Counts tasks scheduled by execution stages"),
{ metrics::label_instance("execution_stage", name), },
[name, &esm = internal::execution_stage_manager::get()] {
return esm.get_stage(name)->get_stats().tasks_scheduled;
}),
metrics::make_derive("tasks_preempted",
metrics::description("Counts tasks which were preempted before execution all queued operations"),
{ metrics::label_instance("execution_stage", name), },
[name, &esm = internal::execution_stage_manager::get()] {
return esm.get_stage(name)->get_stats().tasks_preempted;
}),
metrics::make_derive("function_calls_enqueued",
metrics::description("Counts function calls added to execution stages queues"),
{ metrics::label_instance("execution_stage", name), },
[name, &esm = internal::execution_stage_manager::get()] {
return esm.get_stage(name)->get_stats().function_calls_enqueued;
}),
metrics::make_derive("function_calls_executed",
metrics::description("Counts function calls executed by execution stages"),
{ metrics::label_instance("execution_stage", name), },
[name, &esm = internal::execution_stage_manager::get()] {
return esm.get_stage(name)->get_stats().function_calls_executed;
}),
})
{
internal::execution_stage_manager::get().register_execution_stage(*this);
}
Expand All @@ -373,6 +443,9 @@ inline execution_stage::~execution_stage()
}

inline execution_stage::execution_stage(execution_stage&& other)
: _stats(other._stats)
, _name(std::move(other._name))
, _metric_group(std::move(other._metric_group))
{
internal::execution_stage_manager::get().update_execution_stage_registration(other, *this);
}
Expand Down
2 changes: 2 additions & 0 deletions core/metrics_registration.hh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class metric_groups {
std::unique_ptr<impl::metric_groups_def> _impl;
public:
metric_groups() noexcept;
metric_groups(metric_groups&&) = default;
virtual ~metric_groups();
/*!
* \brief add metrics belong to the same group in the constructor.
Expand Down Expand Up @@ -127,6 +128,7 @@ class metric_group : public metric_groups {
public:
metric_group() noexcept;
metric_group(const metric_group&) = delete;
metric_group(metric_group&&) = default;
virtual ~metric_group();

/*!
Expand Down
79 changes: 72 additions & 7 deletions tests/execution_stage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,37 @@

static std::random_device rd;

SEASTAR_TEST_CASE(test_create_stage_from_lvalue_function_object) {
return seastar::async([] {
auto dont_move = [obj = make_shared<int>(53)] { return *obj; };
auto stage = seastar::make_execution_stage("test", dont_move);
BOOST_REQUIRE_EQUAL(stage().get0(), 53);
BOOST_REQUIRE_EQUAL(dont_move(), 53);
});
}

SEASTAR_TEST_CASE(test_create_stage_from_rvalue_function_object) {
return seastar::async([] {
auto dont_copy = [obj = std::make_unique<int>(42)] { return *obj; };
auto stage = seastar::make_execution_stage("test", std::move(dont_copy));
BOOST_REQUIRE_EQUAL(stage().get0(), 42);
});
}

int func() {
return 64;
}

SEASTAR_TEST_CASE(test_create_stage_from_function) {
return seastar::async([] {
auto stage = seastar::make_execution_stage("test", func);
BOOST_REQUIRE_EQUAL(stage().get0(), 64);
});
}

template<typename Function, typename Verify>
void test_simple_execution_stage(Function&& func, Verify&& verify) {
auto stage = seastar::make_execution_stage(std::forward<Function>(func));
auto stage = seastar::make_execution_stage("test", std::forward<Function>(func));

std::vector<int> vs;
std::default_random_engine gen(rd());
Expand Down Expand Up @@ -86,7 +114,7 @@ SEASTAR_TEST_CASE(test_simple_stage_returning_future_int) {

template<typename T>
void test_execution_stage_avoids_copy() {
auto stage = seastar::make_execution_stage([] (T obj) {
auto stage = seastar::make_execution_stage("test", [] (T obj) {
return std::move(obj);
});

Expand Down Expand Up @@ -123,7 +151,7 @@ SEASTAR_TEST_CASE(test_stage_prefers_move_to_copy) {

SEASTAR_TEST_CASE(test_rref_decays_to_value) {
return seastar::async([] {
auto stage = seastar::make_execution_stage([] (std::vector<int>&& vec) {
auto stage = seastar::make_execution_stage("test", [] (std::vector<int>&& vec) {
return vec.size();
});

Expand All @@ -143,7 +171,7 @@ SEASTAR_TEST_CASE(test_rref_decays_to_value) {

SEASTAR_TEST_CASE(test_lref_does_not_decay) {
return seastar::async([] {
auto stage = seastar::make_execution_stage([] (int& v) {
auto stage = seastar::make_execution_stage("test", [] (int& v) {
v++;
});

Expand All @@ -163,7 +191,7 @@ SEASTAR_TEST_CASE(test_lref_does_not_decay) {

SEASTAR_TEST_CASE(test_explicit_reference_wrapper_is_not_unwrapped) {
return seastar::async([] {
auto stage = seastar::make_execution_stage([] (seastar::reference_wrapper<int> v) {
auto stage = seastar::make_execution_stage("test", [] (seastar::reference_wrapper<int> v) {
v.get()++;
});

Expand All @@ -190,7 +218,7 @@ SEASTAR_TEST_CASE(test_function_is_class_member) {
}
};

auto stage = seastar::make_execution_stage(&foo::member);
auto stage = seastar::make_execution_stage("test", &foo::member);

foo object;
std::vector<future<int>> fs;
Expand All @@ -213,9 +241,46 @@ SEASTAR_TEST_CASE(test_function_is_const_class_member) {
return value;
}
};
auto stage = seastar::make_execution_stage(&foo::member);
auto stage = seastar::make_execution_stage("test", &foo::member);

const foo object;
BOOST_REQUIRE_EQUAL(stage(&object).get0(), 999);
});
}

SEASTAR_TEST_CASE(test_stage_stats) {
return seastar::async([] {
auto stage = seastar::make_execution_stage("test", [] { });

BOOST_REQUIRE_EQUAL(stage.get_stats().function_calls_enqueued, 0);
BOOST_REQUIRE_EQUAL(stage.get_stats().function_calls_executed, 0);

auto fs = std::vector<future<>>();
static constexpr auto call_count = 53;
for (auto i = 0; i < call_count; i++) {
fs.emplace_back(stage());
}

BOOST_REQUIRE_EQUAL(stage.get_stats().function_calls_enqueued, call_count);

for (auto i = 0; i < call_count; i++) {
fs[i].get();
BOOST_REQUIRE_GE(stage.get_stats().tasks_scheduled, 1);
BOOST_REQUIRE_GE(stage.get_stats().function_calls_executed, i);
}
BOOST_REQUIRE_EQUAL(stage.get_stats().function_calls_executed, call_count);
});
}

SEASTAR_TEST_CASE(test_unique_stage_names_are_enforced) {
return seastar::async([] {
{
auto stage = seastar::make_execution_stage("test", [] {});
BOOST_REQUIRE_THROW(seastar::make_execution_stage("test", [] {}), std::invalid_argument);
stage().get();
}

auto stage = seastar::make_execution_stage("test", [] {});
stage().get();
});
}

0 comments on commit 2ebe842

Please sign in to comment.