Skip to content

Commit

Permalink
+ task running loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Harold2017 committed Mar 29, 2022
1 parent de2dff0 commit 7fecb82
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ FetchContent_Declare(fmt
FetchContent_MakeAvailable(fmt)


add_executable(test test/event.cpp)
add_executable(test test/loop.cpp)
target_include_directories(test PRIVATE ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(test PRIVATE fmt::fmt)
30 changes: 30 additions & 0 deletions include/coro/handle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include <cstdint>

namespace coro
{
using HandleID = uint64_t;

class handle
{
public:
handle() : id(id_gen++) { }
virtual ~handle() = default;

HandleID get_handle_id() { return id; }

virtual void run() = 0;
virtual void dump_backtrace(size_t) const { }

private:
HandleID id;
inline static HandleID id_gen = 0;
};

struct handle_wrapper
{
HandleID id;
handle* handle;
};
}
91 changes: 91 additions & 0 deletions include/coro/loop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#pragma once

#include "handle.h"
#include "task.h"
#include <queue>
#include <chrono>
#include <vector>
#include <algorithm>
#include <thread>

namespace coro
{
class Loop
{
using MS = std::chrono::milliseconds;
using delayed_handle = std::pair<MS, handle_wrapper>;
using clock = std::chrono::steady_clock;

public:
Loop()
{
startup_time = std::chrono::duration_cast<MS>(clock::now().time_since_epoch());
}
~Loop() = default;
Loop(Loop const&) = delete;
Loop(Loop&&) = delete;
Loop& operator=(Loop const&) = delete;
Loop& operator=(Loop&&) = delete;

void call(handle& _handle)
{
handles.push({ _handle.get_handle_id(), &_handle });
}

template<typename Ret>
void call(task<Ret>& _task)
{
call(_task.promise());
}

template<typename Rep, typename Period, typename Ret>
void call_after(std::chrono::duration<Rep, Period> delay, task<Ret>& _task)
{
auto t = std::chrono::duration_cast<MS>(delay) + now();
delayed_handles.emplace_back(t, handle_wrapper{ _task.promise().get_handle_id(), &_task.promise() });
std::ranges::push_heap(delayed_handles, std::ranges::greater{}, &delayed_handle::first); // min heap
}

void run_until_complete()
{
while(!is_stop()) run_once();
}

private:
bool is_stop()
{
return handles.empty() && delayed_handles.empty();
}

void run_once()
{
auto current = now();
while (!delayed_handles.empty())
{
auto&& [t, h] = delayed_handles[0];
if (t >= current) break;
handles.push(h);
std::ranges::pop_heap(delayed_handles, std::ranges::greater{}, &delayed_handle::first);
delayed_handles.pop_back();
}

for (size_t i = 0, n = handles.size(); i < n; i++)
{
auto [id, h] = handles.front();
handles.pop();
h->run();
}
}

MS now()
{
return std::chrono::duration_cast<MS>(clock::now().time_since_epoch()) - startup_time;
}

private:
std::queue<handle_wrapper> handles;

MS startup_time;
std::vector<delayed_handle> delayed_handles; // minimum time heap
};
}
15 changes: 13 additions & 2 deletions include/coro/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@

#include <fmt/core.h>

#include "handle.h"

namespace coro
{
template<typename Ret = void>
struct task;

namespace detail
{
struct promise_base
struct promise_base : handle
{
struct final_awaiter
{
Expand Down Expand Up @@ -47,7 +49,7 @@ namespace coro
}

std::source_location const& get_frame_info() const { return m_frame_info; }
void dump_backtrace(size_t depth = 0) const
void dump_backtrace(size_t depth = 0) const override final
{
auto frame_name = fmt::format("{} at {}:{}", m_frame_info.function_name(), m_frame_info.file_name(), m_frame_info.line());
fmt::print("[{}] {}\n", depth, frame_name);
Expand Down Expand Up @@ -85,6 +87,8 @@ namespace coro
return std::move(m_ret_value);
}

void run() override final;

private:
Ret m_ret_value;
};
Expand All @@ -104,6 +108,8 @@ namespace coro
if (m_exception_ptr)
std::rethrow_exception(m_exception_ptr);
}

void run() override final;
};
}

Expand Down Expand Up @@ -217,6 +223,11 @@ namespace coro

inline task<> promise<void>::get_return_object() noexcept { return task<>(handle_type::from_promise(*this)); }

template<typename Ret>
inline void promise<Ret>::run() { auto h = handle_type::from_promise(*this); if (h != nullptr && !h.done()) h.resume(); }

inline void promise<void>::run() { auto h = handle_type::from_promise(*this); if (h != nullptr && !h.done()) h.resume(); }

struct CallStackAwaiter
{
bool await_ready() const noexcept { return false; }
Expand Down
46 changes: 46 additions & 0 deletions test/loop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "coro/loop.h"
#include "coro/task.h"

using namespace coro;

int main()
{
auto t = []() -> task<int> { co_await dump_callstack(); co_return 1; };

auto sum = [&]() -> task<> {
int sum = 0;
for (int i = 0; i < 10; i++)
sum += co_await t();

fmt::print("sum == {}: {}\n", sum, sum == 10);
co_return;
}();

//sum.promise().run();

Loop loop;
fmt::print("create loop\n");

//loop.call(sum.promise());
loop.call(sum);
fmt::print("push task sum into loop queue\n");

auto j = []() -> task<> { co_await dump_callstack(); }();
loop.call(j);
fmt::print("push task j into loop queue\n");

auto k = []() -> task<int> { co_await dump_callstack(); co_return 2; }();
using namespace std::chrono_literals;
loop.call_after(std::chrono::duration(1s), k);
fmt::print("push task k into loop queue\n");

auto h = []() -> task<> { co_await dump_callstack(); }();
using namespace std::chrono_literals;
loop.call_after(std::chrono::duration(2s), h);
fmt::print("push task h into loop queue\n");

// add all tasks before this
loop.run_until_complete();

return 0;
}

0 comments on commit 7fecb82

Please sign in to comment.