Skip to content

Commit

Permalink
Merge branch 'boostorg:develop' into zig-pkg
Browse files Browse the repository at this point in the history
  • Loading branch information
kassane authored Aug 20, 2023
2 parents f0af2f2 + 2e3594c commit cdba86c
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 15 deletions.
2 changes: 1 addition & 1 deletion doc/buffered_channel.qbk
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
[section:buffered_channel Buffered Channel]

__boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to
synchonize fibers (running on same or different threads) via asynchronouss
synchronize fibers (running on the same or different threads) via asynchronous
message passing.

typedef boost::fibers::buffered_channel< int > channel_t;
Expand Down
80 changes: 67 additions & 13 deletions include/boost/fiber/buffered_channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,22 @@ class buffered_channel {
buffered_channel & operator=( buffered_channel const&) = delete;

bool is_closed() const noexcept {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::defer_lock};
for(;;) {
if(lk.try_lock())
break;
context::active()->yield();
}
return is_closed_();
}

void close() noexcept {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::defer_lock};
for(;;) {
if(lk.try_lock())
break;
context::active()->yield();
}
if ( ! closed_) {
closed_ = true;
waiting_producers_.notify_all();
Expand All @@ -94,7 +104,12 @@ class buffered_channel {
}

channel_op_status try_push( value_type const& value) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::defer_lock};
for(;;) {
if(lk.try_lock())
break;
context::active()->yield();
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand All @@ -107,8 +122,14 @@ class buffered_channel {
return channel_op_status::success;
}

channel_op_status try_push( value_type && value) {
detail::spinlock_lock lk{ splk_ };
channel_op_status try_push( value_type && value) {

detail::spinlock_lock lk{splk_, std::defer_lock};
for(;;) {
if(lk.try_lock())
break;
context::active()->yield();
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand All @@ -124,7 +145,11 @@ class buffered_channel {
channel_op_status push( value_type const& value) {
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand All @@ -142,7 +167,11 @@ class buffered_channel {
channel_op_status push( value_type && value) {
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand Down Expand Up @@ -178,7 +207,11 @@ class buffered_channel {
context * active_ctx = context::active();
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand All @@ -201,7 +234,11 @@ class buffered_channel {
context * active_ctx = context::active();
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
}
Expand All @@ -220,7 +257,12 @@ class buffered_channel {
}

channel_op_status try_pop( value_type & value) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::defer_lock};
for(;;) {
if(lk.try_lock())
break;
context::active()->yield();
}
if ( is_empty_() ) {
return is_closed_()
? channel_op_status::closed
Expand All @@ -235,7 +277,11 @@ class buffered_channel {
channel_op_status pop( value_type & value) {
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( is_empty_() ) {
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
Expand All @@ -253,7 +299,11 @@ class buffered_channel {
value_type value_pop() {
context * active_ctx = context::active();
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( is_empty_() ) {
if ( BOOST_UNLIKELY( is_closed_() ) ) {
throw fiber_error{
Expand Down Expand Up @@ -283,7 +333,11 @@ class buffered_channel {
context * active_ctx = context::active();
std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
for (;;) {
detail::spinlock_lock lk{ splk_ };
detail::spinlock_lock lk{splk_, std::try_to_lock};
if (!lk) {
active_ctx->yield();
continue;
}
if ( is_empty_() ) {
if ( BOOST_UNLIKELY( is_closed_() ) ) {
return channel_op_status::closed;
Expand Down
4 changes: 4 additions & 0 deletions include/boost/fiber/detail/cpu_relax.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ namespace detail {
// processors
// extended mnemonics (available with POWER7)
// yield == or 27, 27, 27
# if defined(__POWERPC__) // Darwin PPC
# define cpu_relax() asm volatile ("or r27,r27,r27" ::: "memory");
# else
# define cpu_relax() asm volatile ("or 27,27,27" ::: "memory");
# endif
#elif BOOST_ARCH_X86
# if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED
# define cpu_relax() YieldProcessor();
Expand Down
6 changes: 5 additions & 1 deletion src/waker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ wait_queue::suspend_and_wait_until( detail::spinlock_lock & lk,
// suspend this fiber
if ( ! active_ctx->wait_until( timeout_time, lk, waker(w)) ) {
// relock local lk
lk.lock();
for(;;) {
if(lk.try_lock())
break;
active_ctx->yield();
}
// remove from waiting-queue
if ( w.is_linked()) {
slist_.remove( w);
Expand Down

0 comments on commit cdba86c

Please sign in to comment.