Skip to content

Commit

Permalink
Fixed up the new closable thread queue.
Browse files Browse the repository at this point in the history
  • Loading branch information
fpagliughi committed Jul 12, 2024
1 parent 1b71a16 commit 3b857a5
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 64 deletions.
108 changes: 51 additions & 57 deletions include/mqtt/thread_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,9 @@ class thread_queue
/** General purpose guard */
using unique_guard = std::unique_lock<std::mutex>;

/** Throw an excpetion if the queue is closed. */
void check_closed() {
if (closed_) throw queue_closed{};
}

/** Throw an excpetion if the queue is done. */
void check_done() {
if (closed_ && que_.empty()) throw queue_closed{};
/** Checks if the queue is done (unsafe) */
bool is_done() const {
return closed_ && que_.empty();
}

public:
Expand Down Expand Up @@ -162,15 +157,15 @@ class thread_queue
* a sufficient number
*/
void capacity(size_type cap) {
guard g(lock_);
guard g{lock_};
cap_ = cap;
}
/**
* Gets the number of items in the queue.
* @return The number of items in the queue.
*/
size_type size() const {
guard g(lock_);
guard g{lock_};
return que_.size();
}
/**
Expand All @@ -180,18 +175,12 @@ class thread_queue
* it is empty.
*/
void close() {
guard g{lock_};
unique_guard g{lock_};
closed_ = true;
}
/*
void close(value_type finalVal) {
unique_guard g(lock_);
if (closed_) return;
que_.emplace(std::move(finalVal));
g.unlock();
notEmptyCond_.notify_one();
notFullCond_.notify_all();
notEmptyCond_.notify_all();
}
*/
/**
* Determines if the queue is closed.
* Once closed, the queue will not accept any new items, but receievers
Expand All @@ -204,14 +193,14 @@ class thread_queue
return closed_;
}
/**
* Determines if all possible operations are done on the queue. If the
* queue is closed and empty, then no further useful operations can be
* done on it.
* Determines if all possible operations are done on the queue.
* If the queue is closed and empty, then no further useful operations
* can be done on it.
* @return @true if the queue is closed and empty, @em false otherwise.
*/
bool done() const {
guard g{lock_};
return closed_ && que_.empty();
return is_done();
}
/**
* Put an item into the queue.
Expand All @@ -220,10 +209,10 @@ class thread_queue
* @param val The value to add to the queue.
*/
void put(value_type val) {
unique_guard g(lock_);
unique_guard g{lock_};
notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });

check_closed();
if (closed_) throw queue_closed{};
que_.emplace(std::move(val));
g.unlock();
notEmptyCond_.notify_one();
Expand All @@ -235,9 +224,8 @@ class thread_queue
* item was not added because the queue is currently full.
*/
bool try_put(value_type val) {
unique_guard g(lock_);
check_closed();
if (que_.size() >= cap_)
unique_guard g{lock_};
if (que_.size() >= cap_ || closed_)
return false;

que_.emplace(std::move(val));
Expand All @@ -256,11 +244,12 @@ class thread_queue
*/
template <typename Rep, class Period>
bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
unique_guard g(lock_);
bool to = !notFullCond_.wait_for(g, relTime,
[this] { return que_.size() < cap_ || closed_; });
check_closed();
if (to)
unique_guard g{lock_};
bool to = !notFullCond_.wait_for(
g, relTime,
[this] { return que_.size() < cap_ || closed_; }
);
if (to || closed_)
return false;

que_.emplace(std::move(val));
Expand All @@ -282,12 +271,13 @@ class thread_queue
bool try_put_until(
value_type val, const std::chrono::time_point<Clock, Duration>& absTime
) {
unique_guard g(lock_);
bool to = !notFullCond_.wait_until(g, absTime,
[this] { return que_.size() < cap_ || closed_; });
unique_guard g{lock_};
bool to = !notFullCond_.wait_until(
g, absTime,
[this] { return que_.size() < cap_ || closed_; }
);

check_closed();
if (to)
if (to || closed_)
return false;

que_.emplace(std::move(val));
Expand All @@ -301,18 +291,20 @@ class thread_queue
* added to the queue by another thread,
* @param val Pointer to a variable to receive the value.
*/
void get(value_type* val) {
bool get(value_type* val) {
if (!val)
return;
return false;

unique_guard g(lock_);
unique_guard g{lock_};
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
check_done();
if (que_.empty()) // We must be done
return false;

*val = std::move(que_.front());
que_.pop();
g.unlock();
notFullCond_.notify_one();
return true;
}
/**
* Retrieve a value from the queue.
Expand All @@ -321,9 +313,10 @@ class thread_queue
* @return The value removed from the queue
*/
value_type get() {
unique_guard g(lock_);
unique_guard g{lock_};
notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
check_done();
if (que_.empty()) // We must be done
throw queue_closed{};

value_type val = std::move(que_.front());
que_.pop();
Expand All @@ -343,12 +336,9 @@ class thread_queue
if (!val)
return false;

unique_guard g(lock_);
if (que_.empty()) {
if (closed_)
throw queue_closed{};
unique_guard g{lock_};
if (que_.empty())
return false;
}

*val = std::move(que_.front());
que_.pop();
Expand All @@ -371,11 +361,13 @@ class thread_queue
if (!val)
return false;

unique_guard g(lock_);
bool to = !notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
unique_guard g{lock_};
notEmptyCond_.wait_for(
g, relTime,
[this] { return !que_.empty() || closed_; }
);

check_done();
if (to)
if (que_.empty())
return false;

*val = std::move(que_.front());
Expand All @@ -401,10 +393,12 @@ class thread_queue
if (!val)
return false;

unique_guard g(lock_);
bool to = !notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty(); });
check_done();
if (to)
unique_guard g{lock_};
notEmptyCond_.wait_until(
g, absTime, [this] { return !que_.empty() || closed_; }
);

if (que_.empty())
return false;

*val = std::move(que_.front());
Expand Down
47 changes: 40 additions & 7 deletions test/unit/test_thread_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ TEST_CASE("thread_queue tryget", "[thread_queue]")
REQUIRE(n == 3);
}

TEST_CASE("thread_queue tryput", "[thread_queue]")
{
thread_queue<int> que{2};

REQUIRE(que.try_put(1));
REQUIRE(que.try_put(2));

// Queue full. Put should fail
REQUIRE(!que.try_put(3));
REQUIRE(!que.try_put_for(3, 5ms));

auto timeout = steady_clock::now() + 15ms;
REQUIRE(!que.try_put_until(3, timeout));
}

TEST_CASE("thread_queue mt put/get", "[thread_queue]")
{
thread_queue<string> que;
Expand Down Expand Up @@ -134,21 +149,39 @@ TEST_CASE("thread_queue close", "[thread_queue]")
REQUIRE(que.size() == 2);

REQUIRE_THROWS_AS(que.put(3), queue_closed);
REQUIRE_THROWS_AS(que.try_put(3), queue_closed);
REQUIRE_THROWS_AS(que.try_put_for(3, 10ms), queue_closed);
REQUIRE_THROWS_AS(que.try_put_until(3, steady_clock::now() + 10ms), queue_closed);
REQUIRE(!que.try_put(3));
REQUIRE(!que.try_put_for(3, 10ms));
REQUIRE(!que.try_put_until(3, steady_clock::now() + 10ms));

// But can get any items already in there.
REQUIRE(que.get() == 1);
REQUIRE(que.get() == 2);

// When done (closed and empty), should throw on a get()
// When done (closed and empty), should throw on a get(),
// or fail on a try_get
REQUIRE(que.empty());
REQUIRE(que.done());

int n;
REQUIRE_THROWS_AS(que.get(), queue_closed);
REQUIRE_THROWS_AS(que.try_get(&n), queue_closed);
REQUIRE_THROWS_AS(que.try_get_for(&n, 10ms), queue_closed);
REQUIRE_THROWS_AS(que.try_get_until(&n, steady_clock::now() + 10ms), queue_closed);
REQUIRE(!que.try_get(&n));
REQUIRE(!que.try_get_for(&n, 10ms));
REQUIRE(!que.try_get_until(&n, steady_clock::now() + 10ms));
}

TEST_CASE("thread_queue close_signals", "[thread_queue]")
{
thread_queue<int> que;
REQUIRE(!que.closed());

auto thr = std::thread([&que] {
std::this_thread::sleep_for(10ms);
que.close();
});

// Should initially block, but then throw when the queue
// is closed by the other thread.
REQUIRE_THROWS_AS(que.get(), queue_closed);

thr.join();
}

0 comments on commit 3b857a5

Please sign in to comment.