diff --git a/doc/buffered_channel.qbk b/doc/buffered_channel.qbk index 3aa00c6f..0323cbfb 100644 --- a/doc/buffered_channel.qbk +++ b/doc/buffered_channel.qbk @@ -8,7 +8,7 @@ [section:buffered_channel Buffered Channel] __boost_fiber__ provides a bounded, buffered channel (MPMC queue) suitable to -synchonize fibers (running on same or different threads) via asynchronouss +synchronize fibers (running on the same or different threads) via asynchronous message passing. typedef boost::fibers::buffered_channel< int > channel_t; diff --git a/include/boost/fiber/buffered_channel.hpp b/include/boost/fiber/buffered_channel.hpp index 7ff7d5b7..492486d3 100644 --- a/include/boost/fiber/buffered_channel.hpp +++ b/include/boost/fiber/buffered_channel.hpp @@ -80,12 +80,22 @@ class buffered_channel { buffered_channel & operator=( buffered_channel const&) = delete; bool is_closed() const noexcept { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::defer_lock}; + for(;;) { + if(lk.try_lock()) + break; + context::active()->yield(); + } return is_closed_(); } void close() noexcept { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::defer_lock}; + for(;;) { + if(lk.try_lock()) + break; + context::active()->yield(); + } if ( ! closed_) { closed_ = true; waiting_producers_.notify_all(); @@ -94,7 +104,12 @@ class buffered_channel { } channel_op_status try_push( value_type const& value) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::defer_lock}; + for(;;) { + if(lk.try_lock()) + break; + context::active()->yield(); + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -107,8 +122,14 @@ class buffered_channel { return channel_op_status::success; } - channel_op_status try_push( value_type && value) { - detail::spinlock_lock lk{ splk_ }; + channel_op_status try_push( value_type && value) { + + detail::spinlock_lock lk{splk_, std::defer_lock}; + for(;;) { + if(lk.try_lock()) + break; + context::active()->yield(); + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -124,7 +145,11 @@ class buffered_channel { channel_op_status push( value_type const& value) { context * active_ctx = context::active(); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -142,7 +167,11 @@ class buffered_channel { channel_op_status push( value_type && value) { context * active_ctx = context::active(); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -178,7 +207,11 @@ class buffered_channel { context * active_ctx = context::active(); std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -201,7 +234,11 @@ class buffered_channel { context * active_ctx = context::active(); std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; } @@ -220,7 +257,12 @@ class buffered_channel { } channel_op_status try_pop( value_type & value) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::defer_lock}; + for(;;) { + if(lk.try_lock()) + break; + context::active()->yield(); + } if ( is_empty_() ) { return is_closed_() ? channel_op_status::closed @@ -235,7 +277,11 @@ class buffered_channel { channel_op_status pop( value_type & value) { context * active_ctx = context::active(); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( is_empty_() ) { if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; @@ -253,7 +299,11 @@ class buffered_channel { value_type value_pop() { context * active_ctx = context::active(); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( is_empty_() ) { if ( BOOST_UNLIKELY( is_closed_() ) ) { throw fiber_error{ @@ -283,7 +333,11 @@ class buffered_channel { context * active_ctx = context::active(); std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_); for (;;) { - detail::spinlock_lock lk{ splk_ }; + detail::spinlock_lock lk{splk_, std::try_to_lock}; + if (!lk) { + active_ctx->yield(); + continue; + } if ( is_empty_() ) { if ( BOOST_UNLIKELY( is_closed_() ) ) { return channel_op_status::closed; diff --git a/include/boost/fiber/detail/cpu_relax.hpp b/include/boost/fiber/detail/cpu_relax.hpp index 54cecf12..f79d0ed7 100644 --- a/include/boost/fiber/detail/cpu_relax.hpp +++ b/include/boost/fiber/detail/cpu_relax.hpp @@ -59,7 +59,11 @@ namespace detail { // processors // extended mnemonics (available with POWER7) // yield == or 27, 27, 27 +# if defined(__POWERPC__) // Darwin PPC +# define cpu_relax() asm volatile ("or r27,r27,r27" ::: "memory"); +# else # define cpu_relax() asm volatile ("or 27,27,27" ::: "memory"); +# endif #elif BOOST_ARCH_X86 # if BOOST_COMP_MSVC || BOOST_COMP_MSVC_EMULATED # define cpu_relax() YieldProcessor(); diff --git a/src/waker.cpp b/src/waker.cpp index 0abffd10..1444dea4 100644 --- a/src/waker.cpp +++ b/src/waker.cpp @@ -31,7 +31,11 @@ wait_queue::suspend_and_wait_until( detail::spinlock_lock & lk, // suspend this fiber if ( ! active_ctx->wait_until( timeout_time, lk, waker(w)) ) { // relock local lk - lk.lock(); + for(;;) { + if(lk.try_lock()) + break; + active_ctx->yield(); + } // remove from waiting-queue if ( w.is_linked()) { slist_.remove( w);