Skip to content

Commit

Permalink
future: add repeat_until_value(): repeat an action until it returns a…
Browse files Browse the repository at this point in the history
… value

Given an AsyncAction functor a that returns future<optional<T>>,
repeat_until_value(a) will call a repeatedly until it returns an engaged
optional, and return the value as a future<T>.

This is useful for similar to repeat(), except that the "yes" return
(indicated by an engaged optional) also has an associated value, which
is returned to the caller.
  • Loading branch information
avikivity committed Dec 9, 2015
1 parent 9f9182e commit 5dc22fa
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
74 changes: 74 additions & 0 deletions core/future-util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,80 @@ future<> repeat(AsyncAction&& action) {
return f;
}

/// \cond internal

template <typename T>
struct repeat_until_value_type_helper;

/// \endcond

/// Type helper for repeat_until_value()
template <typename T>
struct repeat_until_value_type_helper<future<std::experimental::optional<T>>> {
/// The type of the value we are computing
using value_type = T;
/// Type used by \c AsyncAction while looping
using optional_type = std::experimental::optional<T>;
/// Return type of repeat_until_value()
using future_type = future<value_type>;
/// Return type of \c AsyncAction
using future_optional_type = future<optional_type>;
};

/// Return value of repeat_until_value()
template <typename AsyncAction>
using repeat_until_value_return_type
= typename repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>::future_type;

/// Invokes given action until it fails or the function requests iteration to stop by returning
/// an engaged \c future<std::experimental::optional<T>>. The value is extracted from the
/// \c optional, and returned, as a future, from repeat_until_value().
///
/// \param action a callable taking no arguments, returning a future<std::experimental::optional<T>>.
/// Will be called again as soon as the future resolves, unless the
/// future fails, action throws, or it resolves with an engaged \c optional.
/// If \c action is an r-value it can be moved in the middle of iteration.
/// \return a ready future if we stopped successfully, or a failed future if
/// a call to to \c action failed. The \c optional's value is returned.
template<typename AsyncAction>
repeat_until_value_return_type<AsyncAction>
repeat_until_value(AsyncAction&& action) {
using type_helper = repeat_until_value_type_helper<std::result_of_t<AsyncAction()>>;
// the "T" in the documentation
using value_type = typename type_helper::value_type;
using optional_type = typename type_helper::optional_type;
using futurator = futurize<typename type_helper::future_optional_type>;
do {
auto f = futurator::apply(action);

if (!f.available()) {
return f.then([action = std::forward<AsyncAction>(action)] (auto&& optional) mutable {
if (optional) {
return make_ready_future<value_type>(std::move(optional).value());
} else {
return repeat_until_value(std::forward<AsyncAction>(action));
}
});
}

if (f.failed()) {
return make_exception_future<value_type>(f.get_exception());
}

optional_type&& optional = std::move(f).get0();
if (optional) {
return make_ready_future<value_type>(std::move(optional).value());
}
} while (++future_avail_count % max_inlined_continuations);

promise<value_type> p;
auto f = p.get_future();
schedule(make_task([action = std::forward<AsyncAction>(action), p = std::move(p)] () mutable {
repeat_until_value(std::forward<AsyncAction>(action)).forward_to(std::move(p));
}));
return f;
}

/// Invokes given action until it fails or given condition evaluates to true.
///
/// \param stop_cond a callable taking no arguments, returning a boolean that
Expand Down
18 changes: 18 additions & 0 deletions tests/futures_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,21 @@ SEASTAR_TEST_CASE(test_futurize_from_tuple) {
BOOST_REQUIRE(futurize<void>::from_tuple(v2).get() == v2);
return make_ready_future<>();
}

SEASTAR_TEST_CASE(test_repeat_until_value) {
namespace stdx = std::experimental;
return do_with(int(), [] (int& counter) {
return repeat_until_value([&counter] () -> future<stdx::optional<int>> {
if (counter == 10000) {
return make_ready_future<stdx::optional<int>>(counter);
} else {
++counter;
return make_ready_future<stdx::optional<int>>(stdx::nullopt);
}
}).then([&counter] (int result) {
BOOST_REQUIRE(counter == 10000);
BOOST_REQUIRE(result == counter);
});
});
}

0 comments on commit 5dc22fa

Please sign in to comment.