Skip to content

[New Feature] Timeout-capable channels #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ jobs:

steps:
- uses: actions/checkout@v4

- name: Install GCC 12
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install -y gcc-12 g++-12 cmake
Comment on lines +53 to +54
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By removing the apt-get call I get:

| update-alternatives: error: alternative path /usr/bin/gcc-12 doesn't exist
[build/Ubuntu 22.04 GCC (Debug)-2    ]   ❌  Failure - Main Install GCC 12

I am honestly not familiar with Ubuntu so maybe I am not doing this correctly.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you tried this locally, maybe you don't have GCC 12 already installed. On the GitHub runner, I see 12 is already installed, but not used as default. So just try it here without apt-get update and install.

sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-12 100
sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-12 100
gcc --version
g++ --version

- name: Create Build Environment
# Some projects don't allow in-source building, so create a separate build directory
Expand Down
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Blocking (forever waiting to fetch).
* Range-based for loop supported.
* Close to prevent pushing and stop waiting to fetch.
* Optional timeout for read/write operations using `std::chrono`.
* Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...).
* Tested with GCC, Clang, and MSVC.

Expand All @@ -29,7 +30,6 @@ see [CMakeLists.txt](./examples/cmake-project/CMakeLists.txt) from the [CMake pr

```c++
#include <cassert>

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert.

#include <msd/channel.hpp>

int main() {
Expand Down Expand Up @@ -83,7 +83,6 @@ int main() {

```c++
#include <iostream>

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert.

#include <msd/channel.hpp>

int main() {
Expand All @@ -100,6 +99,31 @@ int main() {
}
```

```c++
#include <iostream>
#include <msd/channel.hpp>

int main() {
msd::channel<int> ch{2};
ch.setTimeout(std::chrono::milliseconds(100));

std::clog << "Testing write timeout on full buffer:\n";
try {
ch << 1;
ch << 2;
std::clog << "Attempting to write to full channel...\n";
ch << 3;
}
catch (const msd::channel_timeout& e) {
std::clog << "Expected timeout occurred: " << e.what() << "\n";
}

return 0;
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.


```

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary.

See [examples](examples).

<br>
Expand Down
3 changes: 3 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ add_example(example_multithreading multithreading.cpp)

add_example(example_streaming streaming.cpp)
run_example(example_streaming)

add_example(example_timeout timeout.cpp)
run_example(example_timeout)
67 changes: 67 additions & 0 deletions examples/timeout.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#include <chrono>
#include <iostream>
#include <thread>

#include "msd/channel.hpp"

// using namespace std::chrono_literals; for post-C++11 code, use this to save some headaches

int main()
{
// small capacity, short timeout
msd::channel<int> ch{2};
ch.setTimeout(std::chrono::milliseconds(100));

std::cout << "Testing write timeout on full buffer:\n";
try {
ch << 1;
ch << 2;
std::cout << "Attempting to write to full channel...\n";
ch << 3;
}
catch (const msd::channel_timeout& e) {
std::cout << "Expected timeout occurred: " << e.what() << "\n";
}

std::cout << "\nTesting read timeout on empty channel:\n";

msd::channel<int> ch2{5};
ch2.setTimeout(std::chrono::milliseconds(200));

try {
int value;
std::cout << "Attempting to read from empty channel...\n";
ch2 >> value;
}
catch (const msd::channel_timeout& e) {
std::cout << "Expected timeout occurred: " << e.what() << "\n";
}

std::cout << "\nDemonstrating timeout with range-based for loop:\n";

msd::channel<int> ch3{5};
ch3.setTimeout(std::chrono::milliseconds(200)); // lower this to see the timeout

// Producer
std::thread writer([&ch3]() {
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ch3 << 1;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
ch3 << 2;
ch3.close();
});

// Consumer
try {
for (const auto& value : ch3) {
std::cout << "Received value: " << value << "\n";
}
}
catch (const msd::channel_timeout& e) {
std::cout << "Timeout in for loop: " << e.what() << "\n";
}

writer.join();

return 0;
}
35 changes: 30 additions & 5 deletions include/msd/channel.hpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright (C) 2023 Andrei Avram

#ifndef MSD_CHANNEL_HPP_
#define MSD_CHANNEL_HPP_

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <mutex>
Expand All @@ -30,6 +30,14 @@ class closed_channel : public std::runtime_error {
explicit closed_channel(const char* msg) : std::runtime_error{msg} {}
};

/**
* @brief Exception thrown when channel operation times out.
*/
class channel_timeout : public std::runtime_error {
public:
explicit channel_timeout(const char* msg) : std::runtime_error{msg} {}
};

/**
* @brief Thread-safe container for sharing data between threads.
*
Expand All @@ -56,10 +64,24 @@ class channel {
*/
explicit constexpr channel(size_type capacity);

/**
* Sets a timeout for channel operations.
*
* @param timeout Duration after which operations will time out.
*/
template <typename Rep, typename Period>
void setTimeout(const std::chrono::duration<Rep, Period>& timeout);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a really useful scenario where you want to set a timeout after the channel has been declared, clear the timeout, or change it?

Or would it be enough to have an overloaded constructor that accepts the capacity and the timeout? And then timeout_ does not need to be atomic. Simplicity is usually better.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, this is probably over-engineered since most of the times you do not really need that to change at runtime/past the program's initialization.

The thing is that when I initially programmed this I tried to add time-out functionality on top of the original implementation as much as possible, so I was reluctant to touch the constructors, this is a good idea however and I'll refactor the PR as soon as I have time.


/**
* Clears any previously set timeout.
*/
void clearTimeout() noexcept;

/**
* Pushes an element into the channel.
*
* @throws closed_channel if channel is closed.
* @throws channel_timeout if operation times out.
*/
template <typename Type>
friend channel<typename std::decay<Type>::type>& operator<<(channel<typename std::decay<Type>::type>&, Type&&);
Expand All @@ -68,6 +90,7 @@ class channel {
* Pops an element from the channel.
*
* @tparam Type The type of the elements
* @throws channel_timeout if operation times out.
*/
template <typename Type>
friend channel<Type>& operator>>(channel<Type>&, Type&);
Expand Down Expand Up @@ -114,14 +137,16 @@ class channel {
std::mutex mtx_;
std::condition_variable cnd_;
std::atomic<bool> is_closed_{false};
std::atomic<std::chrono::nanoseconds> timeout_{std::chrono::nanoseconds::zero()};

template <typename Predicate>
bool waitWithTimeout(std::unique_lock<std::mutex>&, Predicate&&);
bool waitBeforeRead(std::unique_lock<std::mutex>&);
bool waitBeforeWrite(std::unique_lock<std::mutex>&);

inline void waitBeforeRead(std::unique_lock<std::mutex>&);
inline void waitBeforeWrite(std::unique_lock<std::mutex>&);
friend class blocking_iterator<channel>;
};

} // namespace msd

#include "channel.inl"

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't resolve conversations. They help to see if we understand one another. The author of the conversation resolves it when the changes are done accordingly.

In this specific case, #include "channel.inl" is needed.

#endif // MSD_CHANNEL_HPP_
76 changes: 49 additions & 27 deletions include/msd/channel.inl
Original file line number Diff line number Diff line change
@@ -1,29 +1,67 @@
// Copyright (C) 2023 Andrei Avram

namespace msd {

template <typename T>
constexpr channel<T>::channel(const size_type capacity) : cap_{capacity}
{
}

template <typename T>
template <typename Rep, typename Period>
void channel<T>::setTimeout(const std::chrono::duration<Rep, Period>& timeout)
{
timeout_ = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout);
}

template <typename T>
void channel<T>::clearTimeout() noexcept
{
timeout_ = std::chrono::nanoseconds::zero();
}

template <typename T>
template <typename Predicate>
bool channel<T>::waitWithTimeout(std::unique_lock<std::mutex>& lock, Predicate&& predicate)
{
auto timeout = timeout_.load(std::memory_order_relaxed);
if (timeout == std::chrono::nanoseconds::zero()) {
cnd_.wait(lock, std::forward<Predicate>(predicate));
return true;
}

return cnd_.wait_for(lock, timeout, std::forward<Predicate>(predicate));
}

template <typename T>
bool channel<T>::waitBeforeRead(std::unique_lock<std::mutex>& lock)
{
return waitWithTimeout(lock, [this]() { return !empty() || closed(); });
}

template <typename T>
bool channel<T>::waitBeforeWrite(std::unique_lock<std::mutex>& lock)
{
if (cap_ > 0 && size_ == cap_) {
return waitWithTimeout(lock, [this]() { return size_ < cap_; });
}
return true;
}

template <typename T>
channel<typename std::decay<T>::type>& operator<<(channel<typename std::decay<T>::type>& ch, T&& in)
{
if (ch.closed()) {
throw closed_channel{"cannot write on closed channel"};
}

{
std::unique_lock<std::mutex> lock{ch.mtx_};
ch.waitBeforeWrite(lock);

if (!ch.waitBeforeWrite(lock)) {
throw channel_timeout{"write operation timed out"};
}
ch.queue_.push(std::forward<T>(in));
++ch.size_;
}

ch.cnd_.notify_one();

return ch;
}

Expand All @@ -33,20 +71,18 @@ channel<T>& operator>>(channel<T>& ch, T& out)
if (ch.closed() && ch.empty()) {
return ch;
}

{
std::unique_lock<std::mutex> lock{ch.mtx_};
ch.waitBeforeRead(lock);

if (!ch.waitBeforeRead(lock)) {
throw channel_timeout{"read operation timed out"};
}
if (!ch.empty()) {
out = std::move(ch.queue_.front());
ch.queue_.pop();
--ch.size_;
}
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert.

ch.cnd_.notify_one();

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert.

return ch;
}

Expand All @@ -67,15 +103,15 @@ void channel<T>::close() noexcept
{
{
std::unique_lock<std::mutex> lock{mtx_};
is_closed_.store(true);
is_closed_.store(true, std::memory_order_relaxed);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that we do not need explicit memory ordering guarantees (I suppose), so we can give this internal hint in the hope that the compiler can provide the user with more optimized code.

Suppose that in an unlikely scenario the user is handling std::atomic variables close to a channel<T>::close() call, then the compiler could possibly generate better code.

In the case where this is not unsafe, it's basically free to do and it doesn't introduce significant complexity for the user (it is buried in the lib's code).

}
cnd_.notify_all();
}

template <typename T>
bool channel<T>::closed() const noexcept
{
return is_closed_.load();
return is_closed_.load(std::memory_order_relaxed);
}

template <typename T>
Expand All @@ -90,18 +126,4 @@ blocking_iterator<channel<T>> channel<T>::end() noexcept
return blocking_iterator<channel<T>>{*this};
}

template <typename T>
void channel<T>::waitBeforeRead(std::unique_lock<std::mutex>& lock)
{
cnd_.wait(lock, [this]() { return !empty() || closed(); });
}

template <typename T>
void channel<T>::waitBeforeWrite(std::unique_lock<std::mutex>& lock)
{
if (cap_ > 0 && size_ == cap_) {
cnd_.wait(lock, [this]() { return size_ < cap_; });
}
}

} // namespace msd
Loading
Loading