@@ -76,20 +76,10 @@ bool StreamerNode::initialize(const std::string &input_url) {
7676 receiver_ = std::move (receiver);
7777
7878 streamingThread_ = std::thread (&StreamerNode::streamAudio, this );
79- streamFlag.store (true , std::memory_order_release);
8079 isInitialized_ = true ;
8180 return true ;
8281}
8382
84- void StreamerNode::stop (double when) {
85- AudioScheduledSourceNode::stop (when);
86- streamFlag.store (false , std::memory_order_release);
87- StreamingData dummy;
88- while (receiver_.try_receive (dummy) ==
89- channels::spsc::ResponseStatus::SUCCESS)
90- ; // clear the receiver
91- }
92-
9383bool StreamerNode::setupResampler () {
9484 // Allocate resampler context
9585 swrCtx_ = swr_alloc ();
@@ -126,7 +116,7 @@ bool StreamerNode::setupResampler() {
126116}
127117
128118void StreamerNode::streamAudio () {
129- while (streamFlag .load (std::memory_order_acquire)) {
119+ while (!isNodeFinished_ .load (std::memory_order_acquire)) {
130120 if (av_read_frame (fmtCtx_, pkt_) < 0 ) {
131121 return ;
132122 }
@@ -143,6 +133,10 @@ void StreamerNode::streamAudio() {
143133 }
144134 av_packet_unref (pkt_);
145135 }
136+ StreamingData dummy;
137+ while (receiver_.try_receive (dummy) ==
138+ channels::spsc::ResponseStatus::SUCCESS)
139+ ; // clear the receiver
146140}
147141
148142std::shared_ptr<AudioBus> StreamerNode::processNode (
@@ -151,6 +145,7 @@ std::shared_ptr<AudioBus> StreamerNode::processNode(
151145 size_t startOffset = 0 ;
152146 size_t offsetLength = 0 ;
153147 updatePlaybackInfo (processingBus, framesToProcess, startOffset, offsetLength);
148+ isNodeFinished_.store (isFinished (), std::memory_order_release);
154149
155150 if (!isPlaying () && !isStopScheduled ()) {
156151 processingBus->zero ();
@@ -171,10 +166,14 @@ std::shared_ptr<AudioBus> StreamerNode::processNode(
171166 alreadyProcessed += bufferRemaining;
172167 }
173168 StreamingData data;
174- receiver_.try_receive (data);
175- bufferedBus_ = std::make_shared<AudioBus>(std::move (data.bus ));
176- bufferedBusSize_ = data.size ;
177- processedSamples_ = 0 ;
169+ auto res = receiver_.try_receive (data);
170+ if (res == channels::spsc::ResponseStatus::SUCCESS) {
171+ bufferedBus_ = std::make_shared<AudioBus>(std::move (data.bus ));
172+ bufferedBusSize_ = data.size ;
173+ processedSamples_ = 0 ;
174+ } else {
175+ bufferedBus_ = nullptr ;
176+ }
178177 }
179178 if (bufferedBus_ != nullptr ) {
180179 for (int ch = 0 ; ch < processingBus->getNumberOfChannels (); ch++) {
@@ -223,7 +222,7 @@ bool StreamerNode::processFrameWithResampler(AVFrame *frame) {
223222 }
224223
225224 // if we would like to finish dont copy anything
226- if (!streamFlag. load (std::memory_order_acquire )) {
225+ if (this -> isFinished ( )) {
227226 return true ;
228227 }
229228 auto bus = AudioBus (
@@ -281,7 +280,7 @@ bool StreamerNode::setupDecoder() {
281280}
282281
283282void StreamerNode::cleanup () {
284- streamFlag. store ( false , std::memory_order_release) ;
283+ this -> playbackState_ = PlaybackState::FINISHED ;
285284 // cleanup cannot be called from the streaming thread so there is no need to
286285 // check if we are in the same thread
287286 streamingThread_.join ();
0 commit comments