Skip to content

Commit

Permalink
Merge "Introduce when_all_succeed()" from Paweł
Browse files Browse the repository at this point in the history
"This two patches introduce when_all_succeed(), which is a higher
level version of when_all() that deals correctly with the exceptions
so that the user code doesn't need any boiler-plate code in order to
avoid silently ignored failed futures. Moreover, if there were no errors
it also extracts the values from futures.

It is not a replacement of when_all() as it doesn't allow for more
flexible handling of exceptions originating from different sources, but
in many cases it is a more convenient and less error-prone alternative."

* 'pdziepak/when-all-succeed/v2' of github.com:cloudius-systems/seastar-dev:
  tests/futures_test: add tests for when_all_succeed()
  future-util: introduce when_all_succeed()
  utils: add helpers for dealing with tuples
  when_all: allow transforming the final result
  • Loading branch information
avikivity committed Feb 2, 2017
2 parents 71b2b8f + 6609d55 commit f07f8ed
Show file tree
Hide file tree
Showing 3 changed files with 467 additions and 19 deletions.
237 changes: 218 additions & 19 deletions core/future-util.hh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <iterator>
#include <vector>
#include <experimental/optional>
#include "util/tuple_utils.hh"

/// \cond internal
extern __thread size_t task_quota;
Expand Down Expand Up @@ -390,15 +391,30 @@ future<> do_for_each(Container& c, AsyncAction&& action) {
}

/// \cond internal
namespace seastar {
namespace internal {

template<typename... Futures>
class when_all_state : public enable_lw_shared_from_this<when_all_state<Futures...>> {
struct identity_futures_tuple {
using future_type = future<std::tuple<Futures...>>;
using promise_type = typename future_type::promise_type;

static void set_promise(promise_type& p, std::tuple<Futures...> futures) {
p.set_value(std::move(futures));
}
};

template<typename ResolvedTupleTransform, typename... Futures>
class when_all_state : public enable_lw_shared_from_this<when_all_state<ResolvedTupleTransform, Futures...>> {
using type = std::tuple<Futures...>;
type tuple;
promise<type> p;
public:
typename ResolvedTupleTransform::promise_type p;
when_all_state(Futures&&... t) : tuple(std::make_tuple(std::move(t)...)) {}
~when_all_state() {
p.set_value(std::move(tuple));
ResolvedTupleTransform::set_promise(p, std::move(tuple));
}
private:
template<size_t Idx>
int wait() {
auto& f = std::get<Idx>(tuple);
Expand All @@ -410,16 +426,16 @@ class when_all_state : public enable_lw_shared_from_this<when_all_state<Futures.
}
return 0;
}
public:
template <size_t... Idx>
future<type> wait_all(std::index_sequence<Idx...>) {
typename ResolvedTupleTransform::future_type wait_all(std::index_sequence<Idx...>) {
[] (...) {} (this->template wait<Idx>()...);
return p.get_future();
}
template <typename... Futs>
friend future<std::tuple<Futs...>> when_all(Futs&&... futs);
template<typename U>
friend class lw_shared_ptr;
};

}
}
/// \endcond

/// Wait for many futures to complete, capturing possible errors (variadic version).
Expand All @@ -435,11 +451,16 @@ template <typename... Futs>
inline
future<std::tuple<Futs...>>
when_all(Futs&&... futs) {
auto s = make_lw_shared<when_all_state<Futs...>>(std::forward<Futs>(futs)...);
namespace si = seastar::internal;
using state = si::when_all_state<si::identity_futures_tuple<Futs...>, Futs...>;
auto s = make_lw_shared<state>(std::forward<Futs>(futs)...);
return s->wait_all(std::make_index_sequence<sizeof...(Futs)>());
}

/// \cond internal
namespace seastar {
namespace internal {

template <typename Iterator, typename IteratorCategory>
inline
size_t
Expand All @@ -456,25 +477,48 @@ when_all_estimate_vector_capacity(Iterator begin, Iterator end, std::forward_ite
return std::distance(begin, end);
}

template<typename Future>
struct identity_futures_vector {
using future_type = future<std::vector<Future>>;
static future_type run(std::vector<Future> futures) {
return make_ready_future<std::vector<Future>>(std::move(futures));
}
};

// Internal function for when_all().
template <typename Future>
template <typename ResolvedVectorTransform, typename Future>
inline
future<std::vector<Future>>
typename ResolvedVectorTransform::future_type
complete_when_all(std::vector<Future>&& futures, typename std::vector<Future>::iterator pos) {
// If any futures are already ready, skip them.
while (pos != futures.end() && pos->available()) {
++pos;
}
// Done?
if (pos == futures.end()) {
return make_ready_future<std::vector<Future>>(std::move(futures));
return ResolvedVectorTransform::run(std::move(futures));
}
// Wait for unready future, store, and continue.
return pos->then_wrapped([futures = std::move(futures), pos] (auto fut) mutable {
*pos++ = std::move(fut);
return complete_when_all(std::move(futures), pos);
return complete_when_all<ResolvedVectorTransform>(std::move(futures), pos);
});
}

template<typename ResolvedVectorTransform, typename FutureIterator>
inline auto
do_when_all(FutureIterator begin, FutureIterator end) {
using itraits = std::iterator_traits<FutureIterator>;
std::vector<typename itraits::value_type> ret;
ret.reserve(when_all_estimate_vector_capacity(begin, end, typename itraits::iterator_category()));
// Important to invoke the *begin here, in case it's a function iterator,
// so we launch all computation in parallel.
std::move(begin, end, std::back_inserter(ret));
return complete_when_all<ResolvedVectorTransform>(std::move(ret), ret.begin());
}

}
}
/// \endcond

/// Wait for many futures to complete, capturing possible errors (iterator version).
Expand All @@ -491,13 +535,10 @@ template <typename FutureIterator>
inline
future<std::vector<typename std::iterator_traits<FutureIterator>::value_type>>
when_all(FutureIterator begin, FutureIterator end) {
namespace si = seastar::internal;
using itraits = std::iterator_traits<FutureIterator>;
std::vector<typename itraits::value_type> ret;
ret.reserve(when_all_estimate_vector_capacity(begin, end, typename itraits::iterator_category()));
// Important to invoke the *begin here, in case it's a function iterator,
// so we launch all computation in parallel.
std::move(begin, end, std::back_inserter(ret));
return complete_when_all(std::move(ret), ret.begin());
using result_transform = si::identity_futures_vector<typename itraits::value_type>;
return si::do_when_all<result_transform>(std::move(begin), std::move(end));
}

template <typename T, bool IsFuture>
Expand Down Expand Up @@ -746,6 +787,164 @@ future<T...> with_timeout(std::chrono::time_point<Clock, Duration> timeout, futu
return result;
}

namespace seastar {

namespace internal {

template<typename Future>
struct future_has_value {
enum {
value = !std::is_same<std::decay_t<Future>, future<>>::value
};
};

template<typename Tuple>
struct tuple_to_future;

template<typename... Elements>
struct tuple_to_future<std::tuple<Elements...>> {
using type = future<Elements...>;
using promise_type = promise<Elements...>;

static auto make_ready(std::tuple<Elements...> t) {
auto create_future = [] (auto&&... args) {
return make_ready_future<Elements...>(std::move(args)...);
};
return apply(create_future, std::move(t));
}

static auto make_failed(std::exception_ptr excp) {
return make_exception_future<Elements...>(std::move(excp));
}
};

template<typename... Futures>
class extract_values_from_futures_tuple {
static auto transform(std::tuple<Futures...> futures) {
auto prepare_result = [] (auto futures) {
auto fs = tuple_filter_by_type<internal::future_has_value>(std::move(futures));
return tuple_map(std::move(fs), [] (auto&& e) {
return internal::untuple(e.get());
});
};

using tuple_futurizer = internal::tuple_to_future<decltype(prepare_result(std::move(futures)))>;

std::exception_ptr excp;
tuple_for_each(futures, [&excp] (auto& f) {
if (!excp) {
if (f.failed()) {
excp = f.get_exception();
}
} else {
f.ignore_ready_future();
}
});
if (excp) {
return tuple_futurizer::make_failed(std::move(excp));
}

return tuple_futurizer::make_ready(prepare_result(std::move(futures)));
}
public:
using future_type = decltype(transform(std::declval<std::tuple<Futures...>>()));
using promise_type = typename future_type::promise_type;

static void set_promise(promise_type& p, std::tuple<Futures...> tuple) {
transform(std::move(tuple)).forward_to(std::move(p));
}
};

template<typename Future>
struct extract_values_from_futures_vector {
using value_type = decltype(untuple(std::declval<typename Future::value_type>()));

using future_type = future<std::vector<value_type>>;

static future_type run(std::vector<Future> futures) {
std::vector<value_type> values;
values.reserve(futures.size());

std::exception_ptr excp;
for (auto&& f : futures) {
if (!excp) {
if (f.failed()) {
excp = f.get_exception();
} else {
values.emplace_back(untuple(f.get()));
}
} else {
f.ignore_ready_future();
}
}
if (excp) {
return make_exception_future<std::vector<value_type>>(std::move(excp));
}
return make_ready_future<std::vector<value_type>>(std::move(values));
}
};

template<>
struct extract_values_from_futures_vector<future<>> {
using future_type = future<>;

static future_type run(std::vector<future<>> futures) {
std::exception_ptr excp;
for (auto&& f : futures) {
if (!excp) {
if (f.failed()) {
excp = f.get_exception();
}
} else {
f.ignore_ready_future();
}
}
if (excp) {
return make_exception_future<>(std::move(excp));
}
return make_ready_future<>();
}
};

}

/// Wait for many futures to complete (variadic version).
///
/// Given a variable number of futures as input, wait for all of them
/// to resolve, and return a future containing the values of each individual
/// resolved future.
/// In case any of the given futures fails one of the exceptions is returned
/// by this function as a failed future.
///
/// \param futures futures to wait for
/// \return future containing values of input futures
template<typename... Futures>
inline auto when_all_succeed(Futures&&... futures) {
using state = internal::when_all_state<internal::extract_values_from_futures_tuple<Futures...>, Futures...>;
auto s = make_lw_shared<state>(std::forward<Futures>(futures)...);
return s->wait_all(std::make_index_sequence<sizeof...(Futures)>());
}

/// Wait for many futures to complete (iterator version).
///
/// Given a range of futures as input, wait for all of them
/// to resolve, and return a future containing a vector of values of the
/// original futures.
/// In case any of the given futures fails one of the exceptions is returned
/// by this function as a failed future.
/// \param begin an \c InputIterator designating the beginning of the range of futures
/// \param end an \c InputIterator designating the end of the range of futures
/// \return an \c std::vector<> of all the valus in the input
template <typename FutureIterator, typename = typename std::iterator_traits<FutureIterator>::value_type>
inline auto
when_all_succeed(FutureIterator begin, FutureIterator end) {
using itraits = std::iterator_traits<FutureIterator>;
using result_transform = internal::extract_values_from_futures_vector<typename itraits::value_type>;
return internal::do_when_all<result_transform>(std::move(begin), std::move(end));
}

}

/// @}

#endif /* CORE_FUTURE_UTIL_HH_ */
Loading

0 comments on commit f07f8ed

Please sign in to comment.