Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions src/common/include/display_device/retry_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

// system includes
#include <algorithm>
#include <condition_variable>
#include <functional>
#include <memory>
Expand All @@ -15,11 +16,11 @@

namespace display_device {
/**
* @brief A convenience class for stoping the RetryScheduler.
* @brief A convenience class for stopping the RetryScheduler.
*
* It is conceptualy similar to `std::stop_token` except that it also used
* RAII to perform a cleanup. This allows to allows to return void types
* in the RetryScheduler without a hastle.
* It is conceptually similar to `std::stop_token` except that it also uses
* RAII to perform a cleanup. This allows to return void types
* in the RetryScheduler without a hassle.
*/
class SchedulerStopToken final {
public:
Expand Down Expand Up @@ -87,6 +88,23 @@ namespace display_device {
concept ExecuteCallbackLike = ExecuteWithoutStopToken<T, FunctionT> || ExecuteWithStopToken<T, FunctionT>;
} // namespace detail

/**
* @brief Scheduler options to be used when scheduling executor function.
*/
struct SchedulerOptions {
/**
* @brief Defines the executor's execution logic when it is scheduled.
*/
enum class Execution {
Immediate, ///< Executor is executed in the calling thread immediately and scheduled afterward.
ImmediateWithSleep, ///< The first sleep duration is TAKEN from `m_sleep_durations` and the calling thread is put to sleep. Once awoken, follows by same logic as `Immediate`.
ScheduledOnly ///< Executor is executed in the thread only.
};

std::vector<std::chrono::milliseconds> m_sleep_durations; ///< Specifies for long the scheduled thread sleeps before invoking executor. Last duration is reused indefinitely.
Execution m_execution { Execution::Immediate }; ///< Executor's execution logic.
};

/**
* @brief A wrapper class around an interface that provides a thread-safe access to the
* interface and allows to schedule arbitrary logic for it to retry until it succeeds.
Expand All @@ -106,9 +124,9 @@ namespace display_device {
std::unique_lock lock { m_mutex };
while (m_keep_alive) {
m_syncing_thread = false;
if (m_sleep_duration > std::chrono::milliseconds::zero()) {
if (auto duration { takeNextDuration(m_sleep_durations) }; duration > std::chrono::milliseconds::zero()) {
// We're going to sleep until manually woken up or the time elapses.
m_sleep_cv.wait_for(lock, m_sleep_duration, [this]() { return m_syncing_thread; });
m_sleep_cv.wait_for(lock, duration, [this]() { return m_syncing_thread; });
}
else {
// We're going to sleep until manually woken up.
Expand Down Expand Up @@ -153,9 +171,7 @@ namespace display_device {
* @param exec_fn Provides thread-safe access to the interface for executing arbitrary logic.
* It accepts a `stop_token` as a second parameter which can be used to stop
* the scheduler.
* @param interval Specify the interval for the scheduler.
* @note Before the executor function is scheduled, it is first executed in the calling thread
* immediately and the callback is invoked before returning!
* @param options Options for the scheduler.
* @note Previously scheduled executor is replaced by a new one!
* @examples
* std::unique_ptr<SettingsManagerInterface> iface = getIface(...);
Expand All @@ -165,17 +181,21 @@ namespace display_device {
* if (iface.revertSettings()) {
* stop_token.requestStop();
* }
* }, 5ms);
* }, { .m_sleep_durations = { 50ms, 10ms });
* @examples_end
*/
void
schedule(std::function<void(T &, SchedulerStopToken &stop_token)> exec_fn, std::chrono::milliseconds interval) {
schedule(std::function<void(T &, SchedulerStopToken &stop_token)> exec_fn, const SchedulerOptions &options) {
if (!exec_fn) {
throw std::logic_error { "Empty callback function provided in RetryScheduler::schedule!" };
}

if (interval == std::chrono::milliseconds::zero()) {
throw std::logic_error { "Interval cannot be zero in RetryScheduler::schedule!" };
if (options.m_sleep_durations.empty()) {
throw std::logic_error { "At least 1 sleep duration must be specified in RetryScheduler::schedule!" };
}

if (std::ranges::any_of(options.m_sleep_durations, [&](const auto &duration) { return duration == std::chrono::milliseconds::zero(); })) {
throw std::logic_error { "All of the durations specified in RetryScheduler::schedule must be larger than a 0!" };
}

std::lock_guard lock { m_mutex };
Expand All @@ -184,10 +204,18 @@ namespace display_device {
// We are catching the exception here instead of propagating to have
// similar try...catch login as in the scheduler thread.
try {
exec_fn(*m_iface, stop_token);
auto sleep_durations = options.m_sleep_durations;
if (options.m_execution != SchedulerOptions::Execution::ScheduledOnly) {
if (options.m_execution == SchedulerOptions::Execution::ImmediateWithSleep) {
std::this_thread::sleep_for(takeNextDuration(sleep_durations));
}

exec_fn(*m_iface, stop_token);
}

if (!stop_token.stopRequested()) {
m_retry_function = std::move(exec_fn);
m_sleep_duration = interval;
m_sleep_durations = std::move(sleep_durations);
syncThreadUnlocked();
}
}
Expand All @@ -199,7 +227,7 @@ namespace display_device {
}

/**
* @brief Execute a arbitrary logic using the provided interface in a thread-safe manner.
* @brief Execute arbitrary logic using the provided interface in a thread-safe manner.
* @param exec_fn Provides thread-safe access to the interface for executing arbitrary logic.
* Acceptable function signatures are:
* - AnyReturnType(T &);
Expand Down Expand Up @@ -271,12 +299,24 @@ namespace display_device {
}

private:
static std::chrono::milliseconds
takeNextDuration(std::vector<std::chrono::milliseconds> &durations) {
if (durations.size() > 1) {
const auto front_it { std::begin(durations) };
const auto front_value { *front_it };
durations.erase(front_it);
return front_value;
}

return durations.empty() ? std::chrono::milliseconds::zero() : durations.back();
}

/**
* @brief Clear the necessary data so that the thread will go into a deep sleep.
*/
void
clearThreadLoopUnlocked() {
m_sleep_duration = std::chrono::milliseconds::zero();
m_sleep_durations = {};
m_retry_function = nullptr;
}

Expand All @@ -301,12 +341,12 @@ namespace display_device {
}

std::unique_ptr<T> m_iface; /**< Interface to be passed around to the executor functions. */
std::chrono::milliseconds m_sleep_duration { 0 }; /**< A retry time for the timer. */
std::vector<std::chrono::milliseconds> m_sleep_durations; /**< Sleep times for the timer. */
std::function<void(T &, SchedulerStopToken &)> m_retry_function { nullptr }; /**< Function to be executed until it succeeds. */

std::mutex m_mutex {}; /**< A mutext for synchronizing thread and "external" access. */
std::mutex m_mutex {}; /**< A mutex for synchronizing thread and "external" access. */
std::condition_variable m_sleep_cv {}; /**< Condition variable for waking up thread. */
bool m_syncing_thread { false }; /**< Safeguard for the condition variable to prevent sporadic thread wake ups. */
bool m_syncing_thread { false }; /**< Safeguard for the condition variable to prevent sporadic thread wake-ups. */
bool m_keep_alive { true }; /**< When set to false, scheduler thread will exit. */

// Always the last in the list so that all the members are already initialized!
Expand Down
Loading