Skip to content

Commit

Permalink
Use promises when closing MediaStreams (and all sinks and sources) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun authored Jul 1, 2019
1 parent 28ba178 commit ad852d5
Show file tree
Hide file tree
Showing 18 changed files with 155 additions and 60 deletions.
5 changes: 3 additions & 2 deletions erizo/src/erizo/MediaDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#define ERIZO_SRC_ERIZO_MEDIADEFINITIONS_H_

#include <boost/thread/mutex.hpp>
#include <boost/thread/future.hpp>
#include <vector>
#include <algorithm>

Expand Down Expand Up @@ -158,7 +159,7 @@ class MediaSink: public virtual Monitor {
MediaSink() : audio_sink_ssrc_{0}, video_sink_ssrc_{0}, sink_fb_source_{nullptr} {}
virtual ~MediaSink() {}

virtual void close() = 0;
virtual boost::future<void> close() = 0;

private:
virtual int deliverAudioData_(std::shared_ptr<DataPacket> data_packet) = 0;
Expand Down Expand Up @@ -247,7 +248,7 @@ class MediaSource: public virtual Monitor {
video_sink_{nullptr}, audio_sink_{nullptr}, event_sink_{nullptr}, source_fb_sink_{nullptr} {}
virtual ~MediaSource() {}

virtual void close() = 0;
virtual boost::future<void> close() = 0;
};

} // namespace erizo
Expand Down
5 changes: 3 additions & 2 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ void MediaStream::syncClose() {
connection_.reset();
ELOG_DEBUG("%s message: Close ended", toLog());
}
void MediaStream::close() {

boost::future<void> MediaStream::close() {
ELOG_DEBUG("%s message: Async close called", toLog());
std::shared_ptr<MediaStream> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<MediaStream> stream) {
return asyncTask([shared_this] (std::shared_ptr<MediaStream> stream) {
shared_this->syncClose();
});
}
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
*/
virtual ~MediaStream();
bool init(bool doNotWaitForRemoteSdp);
void close() override;
boost::future<void> close() override;
virtual uint32_t getMaxVideoBW();
virtual uint32_t getBitrateFromMaxQualityLayer() { return bitrate_from_max_quality_layer_; }
virtual uint32_t getVideoBitrate() { return video_bitrate_; }
Expand Down
10 changes: 7 additions & 3 deletions erizo/src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,14 @@ namespace erizo {
}
}

void OneToManyProcessor::close() {
closeAll();
boost::future<void> OneToManyProcessor::close() {
return closeAll();
}

void OneToManyProcessor::closeAll() {
boost::future<void> OneToManyProcessor::closeAll() {
ELOG_DEBUG("OneToManyProcessor closeAll");
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
boost::future<void> f = p->get_future();
feedbackSink_ = nullptr;
publisher.reset();
boost::unique_lock<boost::mutex> lock(monitor_mutex_);
Expand All @@ -177,7 +179,9 @@ namespace erizo {
subscribers.erase(it++);
}
subscribers.clear();
p->set_value();
ELOG_DEBUG("ClosedAll media in this OneToMany");
return f;
}

} // namespace erizo
6 changes: 3 additions & 3 deletions erizo/src/erizo/OneToManyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include <map>
#include <string>
#include <future> // NOLINT
#include <boost/thread/future.hpp>

#include "./MediaDefinitions.h"
#include "media/ExternalOutput.h"
Expand Down Expand Up @@ -47,7 +47,7 @@ class OneToManyProcessor : public MediaSink, public FeedbackSink {
*/
void removeSubscriber(const std::string& peer_id);

void close() override;
boost::future<void> close() override;

private:
typedef std::shared_ptr<MediaSink> sink_ptr;
Expand All @@ -57,7 +57,7 @@ class OneToManyProcessor : public MediaSink, public FeedbackSink {
int deliverVideoData_(std::shared_ptr<DataPacket> video_packet) override;
int deliverFeedback_(std::shared_ptr<DataPacket> fb_packet) override;
int deliverEvent_(MediaEventPtr event) override;
void closeAll();
boost::future<void> closeAll();
bool isSSRCFromAudio(uint32_t ssrc);
uint32_t translateAndMaybeAdaptForSimulcast(uint32_t orig_ssrc);
};
Expand Down
6 changes: 5 additions & 1 deletion erizo/src/erizo/media/ExternalInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class ExternalInput : public MediaSource, public RTPDataReceiver {
void receiveRtpData(unsigned char* rtpdata, int len) override;
int sendPLI() override;

void close() override {}
boost::future<void> close() override {
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
p->set_value();
return p->get_future();
}

private:
boost::scoped_ptr<OutputProcessor> op_;
Expand Down
13 changes: 8 additions & 5 deletions erizo/src/erizo/media/ExternalOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ ExternalOutput::~ExternalOutput() {
ELOG_DEBUG("Destructing");
}

void ExternalOutput::close() {
boost::future<void> ExternalOutput::close() {
std::shared_ptr<ExternalOutput> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<ExternalOutput> connection) {
return asyncTask([shared_this] (std::shared_ptr<ExternalOutput> connection) {
shared_this->syncClose();
});
}
Expand Down Expand Up @@ -133,14 +133,17 @@ void ExternalOutput::syncClose() {

ELOG_DEBUG("Closed Successfully");
}

void ExternalOutput::asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f) {
boost::future<void> ExternalOutput::asyncTask(
std::function<void(std::shared_ptr<ExternalOutput>)> f) {
auto task_promise = std::make_shared<boost::promise<void>>();
std::weak_ptr<ExternalOutput> weak_this = shared_from_this();
worker_->task([weak_this, f] {
worker_->task([weak_this, f, task_promise] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
task_promise->set_value();
});
return task_promise->get_future();
}

void ExternalOutput::receiveRawData(const RawDataPacket& /*packet*/) {
Expand Down
4 changes: 2 additions & 2 deletions erizo/src/erizo/media/ExternalOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback
size_t payload_size,
const webrtc::WebRtcRTPHeader* rtp_header) override;

void close() override;
boost::future<void> close() override;

void write(std::shared_ptr<DataPacket> packet);

Expand Down Expand Up @@ -121,7 +121,7 @@ class ExternalOutput : public MediaSink, public RawDataReceiver, public Feedback

bool initContext();
int sendFirPacket();
void asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f);
boost::future<void> asyncTask(std::function<void(std::shared_ptr<ExternalOutput>)> f);
void queueData(char* buffer, int length, packetType type);
void queueDataAsync(std::shared_ptr<DataPacket> copied_packet);
void sendLoop();
Expand Down
5 changes: 4 additions & 1 deletion erizo/src/erizo/media/MediaProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ void InputProcessor::closeSink() {
this->close();
}

void InputProcessor::close() {
boost::future<void> InputProcessor::close() {
if (audioDecoder == 1) {
avcodec_close(aDecoderContext);
av_free(aDecoderContext);
Expand All @@ -308,6 +308,9 @@ void InputProcessor::close() {
free(unpackagedBuffer_); unpackagedBuffer_ = NULL;
free(unpackagedAudioBuffer_); unpackagedAudioBuffer_ = NULL;
free(decodedAudioBuffer_); decodedAudioBuffer_ = NULL;
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
p->set_value();
return p->get_future();
}

OutputProcessor::OutputProcessor() {
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/media/MediaProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class InputProcessor: public MediaSink {
int unpackageAudio(unsigned char* inBuff, int inBuffLen, unsigned char* outBuff);

void closeSink();
void close() override;
boost::future<void> close() override;

private:
int audioDecoder;
Expand Down
5 changes: 4 additions & 1 deletion erizo/src/erizo/media/SyntheticInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,11 @@ void SyntheticInput::sendAudioFrame(uint32_t size) {
delete header;
}

void SyntheticInput::close() {
boost::future<void> SyntheticInput::close() {
running_ = false;
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
p->set_value();
return p->get_future();
}

void SyntheticInput::calculateSizeAndPeriod(uint32_t video_bitrate, uint32_t audio_bitrate) {
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/media/SyntheticInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SyntheticInput : public MediaSource, public FeedbackSink, public std::enab
std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());
virtual ~SyntheticInput();
int sendPLI() override;
void close() override;
boost::future<void> close() override;
void start();

private:
Expand Down
12 changes: 10 additions & 2 deletions erizo/src/test/OneToManyProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ class MockPublisher: public erizo::MediaSource, public erizo::FeedbackSink {
source_fb_sink_ = this;
}
~MockPublisher() {}
void close() override {}
boost::future<void> close() override {
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
p->set_value();
return p->get_future();
}
int sendPLI() override { return 0; }
int deliverFeedback_(std::shared_ptr<DataPacket> packet) override {
return internalDeliverFeedback_(packet);
Expand All @@ -37,7 +41,11 @@ class MockSubscriber: public erizo::MediaSink, public erizo::FeedbackSource {
sink_fb_source_ = this;
}
~MockSubscriber() {}
void close() override {}
boost::future<void> close() override {
std::shared_ptr<boost::promise<void>> p = std::make_shared<boost::promise<void>>();
p->set_value();
return p->get_future();
}
int deliverAudioData_(std::shared_ptr<DataPacket> packet) override {
return internalDeliverAudioData_(packet);
}
Expand Down
6 changes: 3 additions & 3 deletions erizo/src/test/utils/Mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class MockQualityManager : public QualityManager {

class MockMediaSink : public MediaSink {
public:
void close() override {
internal_close();
boost::future<void> close() override {
return internal_close();
}
MOCK_METHOD0(internal_close, void());
MOCK_METHOD0(internal_close, boost::future<void>());
MOCK_METHOD2(deliverAudioDataInternal, void(char*, int));
MOCK_METHOD2(deliverVideoDataInternal, void(char*, int));
MOCK_METHOD1(deliverEventInternal, void(MediaEventPtr));
Expand Down
88 changes: 73 additions & 15 deletions erizoAPI/MediaStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,28 +53,18 @@ Nan::Persistent<Function> MediaStream::constructor;
MediaStream::MediaStream() : closed_{false}, id_{"undefined"} {
async_stats_ = new uv_async_t;
async_event_ = new uv_async_t;
future_async_ = new uv_async_t;
uv_async_init(uv_default_loop(), async_stats_, &MediaStream::statsCallback);
uv_async_init(uv_default_loop(), async_event_, &MediaStream::eventCallback);
uv_async_init(uv_default_loop(), future_async_, &MediaStream::promiseResolver);
}

MediaStream::~MediaStream() {
close();
ELOG_DEBUG("%s, message: Destroyed", toLog());
}

void MediaStream::close() {
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
return;
}
ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setMediaStreamStatsListener(nullptr);
me->setMediaStreamEventListener(nullptr);
me->close();
me.reset();
}
void MediaStream::closeEvents() {
has_stats_callback_ = false;
has_event_callback_ = false;
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(async_stats_))) {
Expand All @@ -87,8 +77,38 @@ void MediaStream::close() {
uv_close(reinterpret_cast<uv_handle_t*>(async_event_), destroyAsyncHandle);
}
async_event_ = nullptr;
closed_ = true;
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(future_async_))) {
ELOG_DEBUG("%s, message: Closing future handle", toLog());
uv_close(reinterpret_cast<uv_handle_t*>(future_async_), destroyAsyncHandle);
}
future_async_ = nullptr;
}

boost::future<void> MediaStream::close() {
auto close_promise = std::make_shared<boost::promise<void>>();
ELOG_DEBUG("%s, message: Trying to close", toLog());
if (closed_) {
ELOG_DEBUG("%s, message: Already closed", toLog());
close_promise->set_value();
return close_promise->get_future();
}
ELOG_DEBUG("%s, message: Closing", toLog());
if (me) {
me->setMediaStreamStatsListener(nullptr);
me->setMediaStreamEventListener(nullptr);
closed_ = true;
me->close().then([this, close_promise] (boost::future<void>) {
closeEvents();
me.reset();
close_promise->set_value();
});
} else {
closeEvents();
closed_ = true;
close_promise->set_value();
}
ELOG_DEBUG("%s, message: Closed", toLog());
return close_promise->get_future();
}

std::string MediaStream::toLog() {
Expand Down Expand Up @@ -166,9 +186,17 @@ NAN_METHOD(MediaStream::New) {

NAN_METHOD(MediaStream::close) {
MediaStream* obj = Nan::ObjectWrap::Unwrap<MediaStream>(info.Holder());
v8::Local<v8::Promise::Resolver> resolver = v8::Promise::Resolver::New(info.GetIsolate());
Nan::Persistent<v8::Promise::Resolver> *persistent = new Nan::Persistent<v8::Promise::Resolver>(resolver);
if (obj) {
obj->close();
obj->Ref();
obj->close().then(
[persistent, obj] (boost::future<void>) {
ELOG_DEBUG("%s, Close is finishied, resolving promise", obj->toLog());
obj->notifyFuture(persistent);
});
}
info.GetReturnValue().Set(resolver->GetPromise());
}

NAN_METHOD(MediaStream::init) {
Expand Down Expand Up @@ -466,3 +494,33 @@ NAUV_WORK_CB(MediaStream::eventCallback) {
}
ELOG_DEBUG("%s, message: eventsCallback finished", obj->toLog());
}


void MediaStream::notifyFuture(Nan::Persistent<v8::Promise::Resolver> *persistent) {
boost::mutex::scoped_lock lock(mutex);
if (!future_async_) {
return;
}
futures.push(persistent);
future_async_->data = this;
uv_async_send(future_async_);
}

NAUV_WORK_CB(MediaStream::promiseResolver) {
Nan::HandleScope scope;
MediaStream* obj = reinterpret_cast<MediaStream*>(async->data);
if (!obj || !obj->me) {
return;
}
boost::mutex::scoped_lock lock(obj->mutex);
ELOG_DEBUG("%s, message: promiseResolver", obj->toLog());
obj->futures_manager_.cleanResolvedFutures();
while (!obj->futures.empty()) {
auto persistent = obj->futures.front();
v8::Local<v8::Promise::Resolver> resolver = Nan::New(*persistent);
resolver->Resolve(Nan::GetCurrentContext(), Nan::New("").ToLocalChecked());
obj->futures.pop();
obj->Unref();
}
ELOG_DEBUG("%s, message: promiseResolver finished", obj->toLog());
}
Loading

0 comments on commit ad852d5

Please sign in to comment.