Skip to content

[feature] iterative when_all  #55

@kikaxa

Description

@kikaxa

Many code paths (dynamically/iteratively) generate tasks that need to be waited on.
Currently one needs to make a temporary storage for all of them and then create when_all composition.
The proposal api simply separates the when_all initialization and addition steps, allowing it to be easily used with loops, generators, fold expressions etc..

Please let me know if the feature is ok with you and any comments on proposed code below:
I can make a pr then..
// On a side note, while it should be possible to make an analogue for when_any, i dont imagine (many) use-cases for that rn. Therefore not suggesting to add it.

/// Returns when_all state that can be used with tasks from generators without intermediate storage
/// Use operator(task) to fill it with tasks and
/// readyTask is the composite result task
template<typename Ttask = async::task<void> >
auto when_all_iterative(size_t total_count)
{
    typedef typename std::decay<Ttask>::type task_type;
    typedef std::vector<task_type> result_type;

    struct when_all_iterative_state
    {
        typedef async::detail::when_all_state<result_type> state_type;

        void operator() (task_type&& source_task)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");

            // Add a continuation to each task to add its result to the shared state
            // Last task sets the event result
            LIBASYNC_TRY {
                source_task.then(async::inline_scheduler(), async::detail::when_all_func_range<task_type, result_type>(next_index++, async::detail::ref_count_ptr<async::detail::when_all_state<result_type>>(state)));
            } LIBASYNC_CATCH(...) {
                // Make sure we don't leak memory if then() throws
                addFailed(1);
                LIBASYNC_RETHROW();
            }
        }

        void setTotal(size_t total_count)
        {
            LIBASYNC_ASSERT(!state, std::logic_error, "Initializing state twice");

            state = new state_type(total_count);
            state->result.resize(total_count);
            readyTask = state->event.get_task();
        }

        void addFailed(size_t failed = 1)
        {
            LIBASYNC_ASSERT(state, std::logic_error, "Adding to empty when_all");
            state->remove_ref(failed);
            next_index += failed;
        }

    protected:
        state_type* state = nullptr;
        size_t next_index = 0;
    public:
        async::task<result_type> readyTask = async::make_task<result_type>({});
    } state;

    if (total_count)
        state.setTotal(total_count);

    return state;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions