-
Notifications
You must be signed in to change notification settings - Fork 37
[New Feature] Timeout-capable channels #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9b32652
c149418
1f89d8d
892ba14
3a82b34
aedccfa
76775cb
cdf5c50
af2b030
949aca1
9811914
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,6 +12,7 @@ | |
* Blocking (forever waiting to fetch). | ||
* Range-based for loop supported. | ||
* Close to prevent pushing and stop waiting to fetch. | ||
* Optional timeout for read/write operations using `std::chrono`. | ||
* Integrates well with STL algorithms in some cases. Eg: std::move(ch.begin(), ch.end(), ...). | ||
* Tested with GCC, Clang, and MSVC. | ||
|
||
|
@@ -29,7 +30,6 @@ see [CMakeLists.txt](./examples/cmake-project/CMakeLists.txt) from the [CMake pr | |
|
||
```c++ | ||
#include <cassert> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
|
@@ -83,7 +83,6 @@ int main() { | |
|
||
```c++ | ||
#include <iostream> | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
|
@@ -100,6 +99,31 @@ int main() { | |
} | ||
``` | ||
|
||
```c++ | ||
#include <iostream> | ||
#include <msd/channel.hpp> | ||
|
||
int main() { | ||
msd::channel<int> ch{2}; | ||
ch.setTimeout(std::chrono::milliseconds(100)); | ||
|
||
std::clog << "Testing write timeout on full buffer:\n"; | ||
try { | ||
ch << 1; | ||
ch << 2; | ||
std::clog << "Attempting to write to full channel...\n"; | ||
ch << 3; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::clog << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
return 0; | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary. |
||
|
||
``` | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary. |
||
See [examples](examples). | ||
|
||
<br> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
#include <chrono> | ||
#include <iostream> | ||
#include <thread> | ||
|
||
#include "msd/channel.hpp" | ||
|
||
// using namespace std::chrono_literals; for post-C++11 code, use this to save some headaches | ||
|
||
int main() | ||
{ | ||
// small capacity, short timeout | ||
msd::channel<int> ch{2}; | ||
ch.setTimeout(std::chrono::milliseconds(100)); | ||
|
||
std::cout << "Testing write timeout on full buffer:\n"; | ||
try { | ||
ch << 1; | ||
ch << 2; | ||
std::cout << "Attempting to write to full channel...\n"; | ||
ch << 3; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nTesting read timeout on empty channel:\n"; | ||
|
||
msd::channel<int> ch2{5}; | ||
ch2.setTimeout(std::chrono::milliseconds(200)); | ||
|
||
try { | ||
int value; | ||
std::cout << "Attempting to read from empty channel...\n"; | ||
ch2 >> value; | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Expected timeout occurred: " << e.what() << "\n"; | ||
} | ||
|
||
std::cout << "\nDemonstrating timeout with range-based for loop:\n"; | ||
|
||
msd::channel<int> ch3{5}; | ||
ch3.setTimeout(std::chrono::milliseconds(200)); // lower this to see the timeout | ||
|
||
// Producer | ||
std::thread writer([&ch3]() { | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 1; | ||
std::this_thread::sleep_for(std::chrono::milliseconds(200)); | ||
ch3 << 2; | ||
ch3.close(); | ||
}); | ||
|
||
// Consumer | ||
try { | ||
for (const auto& value : ch3) { | ||
std::cout << "Received value: " << value << "\n"; | ||
} | ||
} | ||
catch (const msd::channel_timeout& e) { | ||
std::cout << "Timeout in for loop: " << e.what() << "\n"; | ||
} | ||
|
||
writer.join(); | ||
|
||
return 0; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,9 @@ | ||
// Copyright (C) 2023 Andrei Avram | ||
|
||
#ifndef MSD_CHANNEL_HPP_ | ||
#define MSD_CHANNEL_HPP_ | ||
|
||
#include <atomic> | ||
#include <chrono> | ||
#include <condition_variable> | ||
#include <cstdlib> | ||
#include <mutex> | ||
|
@@ -30,6 +30,14 @@ class closed_channel : public std::runtime_error { | |
explicit closed_channel(const char* msg) : std::runtime_error{msg} {} | ||
}; | ||
|
||
/** | ||
* @brief Exception thrown when channel operation times out. | ||
*/ | ||
class channel_timeout : public std::runtime_error { | ||
public: | ||
explicit channel_timeout(const char* msg) : std::runtime_error{msg} {} | ||
}; | ||
|
||
/** | ||
* @brief Thread-safe container for sharing data between threads. | ||
* | ||
|
@@ -56,10 +64,24 @@ class channel { | |
*/ | ||
explicit constexpr channel(size_type capacity); | ||
|
||
/** | ||
* Sets a timeout for channel operations. | ||
* | ||
* @param timeout Duration after which operations will time out. | ||
*/ | ||
template <typename Rep, typename Period> | ||
void setTimeout(const std::chrono::duration<Rep, Period>& timeout); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a really useful scenario where you want to set a timeout after the channel has been declared, clear the timeout, or change it? Or would it be enough to have an overloaded constructor that accepts the capacity and the timeout? And then There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, this is probably over-engineered since most of the times you do not really need that to change at runtime/past the program's initialization. The thing is that when I initially programmed this I tried to add time-out functionality on top of the original implementation as much as possible, so I was reluctant to touch the constructors, this is a good idea however and I'll refactor the PR as soon as I have time. |
||
|
||
/** | ||
* Clears any previously set timeout. | ||
*/ | ||
void clearTimeout() noexcept; | ||
|
||
/** | ||
* Pushes an element into the channel. | ||
* | ||
* @throws closed_channel if channel is closed. | ||
* @throws channel_timeout if operation times out. | ||
*/ | ||
template <typename Type> | ||
friend channel<typename std::decay<Type>::type>& operator<<(channel<typename std::decay<Type>::type>&, Type&&); | ||
|
@@ -68,6 +90,7 @@ class channel { | |
* Pops an element from the channel. | ||
* | ||
* @tparam Type The type of the elements | ||
* @throws channel_timeout if operation times out. | ||
*/ | ||
template <typename Type> | ||
friend channel<Type>& operator>>(channel<Type>&, Type&); | ||
|
@@ -114,14 +137,16 @@ class channel { | |
std::mutex mtx_; | ||
std::condition_variable cnd_; | ||
std::atomic<bool> is_closed_{false}; | ||
std::atomic<std::chrono::nanoseconds> timeout_{std::chrono::nanoseconds::zero()}; | ||
|
||
template <typename Predicate> | ||
bool waitWithTimeout(std::unique_lock<std::mutex>&, Predicate&&); | ||
bool waitBeforeRead(std::unique_lock<std::mutex>&); | ||
bool waitBeforeWrite(std::unique_lock<std::mutex>&); | ||
|
||
inline void waitBeforeRead(std::unique_lock<std::mutex>&); | ||
inline void waitBeforeWrite(std::unique_lock<std::mutex>&); | ||
friend class blocking_iterator<channel>; | ||
}; | ||
|
||
} // namespace msd | ||
|
||
#include "channel.inl" | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Typo. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please don't resolve conversations. They help to see if we understand one another. The author of the conversation resolves it when the changes are done accordingly. In this specific case, |
||
#endif // MSD_CHANNEL_HPP_ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,29 +1,67 @@ | ||
// Copyright (C) 2023 Andrei Avram | ||
|
||
namespace msd { | ||
|
||
template <typename T> | ||
constexpr channel<T>::channel(const size_type capacity) : cap_{capacity} | ||
{ | ||
} | ||
|
||
template <typename T> | ||
template <typename Rep, typename Period> | ||
void channel<T>::setTimeout(const std::chrono::duration<Rep, Period>& timeout) | ||
{ | ||
timeout_ = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout); | ||
} | ||
|
||
template <typename T> | ||
void channel<T>::clearTimeout() noexcept | ||
{ | ||
timeout_ = std::chrono::nanoseconds::zero(); | ||
} | ||
|
||
template <typename T> | ||
template <typename Predicate> | ||
bool channel<T>::waitWithTimeout(std::unique_lock<std::mutex>& lock, Predicate&& predicate) | ||
{ | ||
auto timeout = timeout_.load(std::memory_order_relaxed); | ||
if (timeout == std::chrono::nanoseconds::zero()) { | ||
cnd_.wait(lock, std::forward<Predicate>(predicate)); | ||
return true; | ||
} | ||
|
||
return cnd_.wait_for(lock, timeout, std::forward<Predicate>(predicate)); | ||
} | ||
andreiavrammsd marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
template <typename T> | ||
bool channel<T>::waitBeforeRead(std::unique_lock<std::mutex>& lock) | ||
{ | ||
return waitWithTimeout(lock, [this]() { return !empty() || closed(); }); | ||
} | ||
|
||
template <typename T> | ||
bool channel<T>::waitBeforeWrite(std::unique_lock<std::mutex>& lock) | ||
{ | ||
if (cap_ > 0 && size_ == cap_) { | ||
return waitWithTimeout(lock, [this]() { return size_ < cap_; }); | ||
} | ||
return true; | ||
} | ||
|
||
template <typename T> | ||
channel<typename std::decay<T>::type>& operator<<(channel<typename std::decay<T>::type>& ch, T&& in) | ||
{ | ||
if (ch.closed()) { | ||
throw closed_channel{"cannot write on closed channel"}; | ||
} | ||
|
||
{ | ||
std::unique_lock<std::mutex> lock{ch.mtx_}; | ||
ch.waitBeforeWrite(lock); | ||
|
||
if (!ch.waitBeforeWrite(lock)) { | ||
throw channel_timeout{"write operation timed out"}; | ||
} | ||
ch.queue_.push(std::forward<T>(in)); | ||
++ch.size_; | ||
} | ||
|
||
ch.cnd_.notify_one(); | ||
|
||
return ch; | ||
} | ||
|
||
|
@@ -33,20 +71,18 @@ channel<T>& operator>>(channel<T>& ch, T& out) | |
if (ch.closed() && ch.empty()) { | ||
return ch; | ||
} | ||
|
||
{ | ||
std::unique_lock<std::mutex> lock{ch.mtx_}; | ||
ch.waitBeforeRead(lock); | ||
|
||
if (!ch.waitBeforeRead(lock)) { | ||
throw channel_timeout{"read operation timed out"}; | ||
} | ||
if (!ch.empty()) { | ||
out = std::move(ch.queue_.front()); | ||
ch.queue_.pop(); | ||
--ch.size_; | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
ch.cnd_.notify_one(); | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert. |
||
return ch; | ||
} | ||
|
||
|
@@ -67,15 +103,15 @@ void channel<T>::close() noexcept | |
{ | ||
{ | ||
std::unique_lock<std::mutex> lock{mtx_}; | ||
is_closed_.store(true); | ||
is_closed_.store(true, std::memory_order_relaxed); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea is that we do not need explicit memory ordering guarantees (I suppose), so we can give this internal hint in the hope that the compiler can provide the user with more optimized code. Suppose that in an unlikely scenario the user is handling In the case where this is not unsafe, it's basically free to do and it doesn't introduce significant complexity for the user (it is buried in the lib's code). |
||
} | ||
cnd_.notify_all(); | ||
} | ||
|
||
template <typename T> | ||
bool channel<T>::closed() const noexcept | ||
{ | ||
return is_closed_.load(); | ||
return is_closed_.load(std::memory_order_relaxed); | ||
} | ||
|
||
template <typename T> | ||
|
@@ -90,18 +126,4 @@ blocking_iterator<channel<T>> channel<T>::end() noexcept | |
return blocking_iterator<channel<T>>{*this}; | ||
} | ||
|
||
template <typename T> | ||
void channel<T>::waitBeforeRead(std::unique_lock<std::mutex>& lock) | ||
{ | ||
cnd_.wait(lock, [this]() { return !empty() || closed(); }); | ||
} | ||
|
||
template <typename T> | ||
void channel<T>::waitBeforeWrite(std::unique_lock<std::mutex>& lock) | ||
{ | ||
if (cap_ > 0 && size_ == cap_) { | ||
cnd_.wait(lock, [this]() { return size_ < cap_; }); | ||
} | ||
} | ||
|
||
} // namespace msd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"gcc-12 is already the newest version (12.3.0-1ubuntu1~22.04)."
Is
update-alternatives
enough?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By removing the
apt-get
call I get:I am honestly not familiar with Ubuntu so maybe I am not doing this correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you tried this locally, maybe you don't have GCC 12 already installed. On the GitHub runner, I see 12 is already installed, but not used as default. So just try it here without apt-get update and install.