Skip to content

Commit

Permalink
MediaStreams now use their own worker (#1107)
Browse files Browse the repository at this point in the history
  • Loading branch information
lodoyun authored Jan 17, 2018
1 parent 340ef69 commit 716a942
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 70 deletions.
14 changes: 6 additions & 8 deletions erizo/src/erizo/DtlsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, c
const IceConfig& iceConfig, std::string username, std::string password,
bool isServer, std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker):
Transport(med, transport_name, connection_id, bundle, rtcp_mux, transport_listener, iceConfig, worker, io_worker),
unprotect_packet_{std::make_shared<DataPacket>()},
readyRtp(false), readyRtcp(false), isServer_(isServer) {
ELOG_DEBUG("%s message: constructor, transportName: %s, isBundle: %d", toLog(), transport_name.c_str(), bundle);
dtlsRtp.reset(new DtlsSocketContext());
Expand Down Expand Up @@ -182,21 +181,20 @@ void DtlsTransport::onIceData(packetPtr packet) {
}
return;
} else if (this->getTransportState() == TRANSPORT_READY) {
unprotect_packet_->length = len;
unprotect_packet_->received_time_ms = packet->received_time_ms;
memcpy(unprotect_packet_->data, data, len);
std::shared_ptr<DataPacket> unprotect_packet = std::make_shared<DataPacket>(component_id,
data, len, VIDEO_PACKET, packet->received_time_ms);

if (dtlsRtcp != NULL && component_id == 2) {
srtp = srtcp_.get();
}
if (srtp != NULL) {
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(unprotect_packet_->data);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(unprotect_packet->data);
if (chead->isRtcp()) {
if (srtp->unprotectRtcp(unprotect_packet_->data, &unprotect_packet_->length) < 0) {
if (srtp->unprotectRtcp(unprotect_packet->data, &unprotect_packet->length) < 0) {
return;
}
} else {
if (srtp->unprotectRtp(unprotect_packet_->data, &unprotect_packet_->length) < 0) {
if (srtp->unprotectRtp(unprotect_packet->data, &unprotect_packet->length) < 0) {
return;
}
}
Expand All @@ -208,7 +206,7 @@ void DtlsTransport::onIceData(packetPtr packet) {
return;
}
if (auto listener = getTransportListener().lock()) {
listener->onTransportData(unprotect_packet_, this);
listener->onTransportData(unprotect_packet, this);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion erizo/src/erizo/DtlsTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class DtlsTransport : dtls::DtlsReceiver, public Transport {

private:
char protectBuf_[5000];
std::shared_ptr<DataPacket> unprotect_packet_;
boost::scoped_ptr<dtls::DtlsSocketContext> dtlsRtp, dtlsRtcp;
boost::mutex writeMutex_, sessionMutex_;
boost::scoped_ptr<SrtpChannel> srtp_, srtcp_;
Expand Down
55 changes: 32 additions & 23 deletions erizo/src/erizo/MediaStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@
namespace erizo {
DEFINE_LOGGER(MediaStream, "MediaStream");

MediaStream::MediaStream(
MediaStream::MediaStream(std::shared_ptr<Worker> worker,
std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id) :
audio_enabled_{false}, video_enabled_{false},
connection_{connection},
stream_id_{media_stream_id},
bundle_{false},
pipeline_{Pipeline::create()}, audio_muted_{false}, video_muted_{false},
pipeline_{Pipeline::create()},
worker_{worker},
audio_muted_{false}, video_muted_{false},
pipeline_initialized_{false} {
setVideoSinkSSRC(kDefaultVideoSinkSSRC);
setAudioSinkSSRC(kDefaultAudioSinkSSRC);
Expand All @@ -57,7 +59,6 @@ MediaStream::MediaStream(
stats_ = connection->getStatsService();
quality_manager_ = std::make_shared<QualityManager>();
packet_buffer_ = std::make_shared<PacketBufferService>();
worker_ = connection->getWorker();

rtcp_processor_ = std::make_shared<RtcpForwarder>(static_cast<MediaSink*>(this), static_cast<MediaSource*>(this));

Expand All @@ -72,13 +73,10 @@ MediaStream::MediaStream(

MediaStream::~MediaStream() {
ELOG_DEBUG("%s message:Destructor called", toLog());
if (sending_) {
close();
}
ELOG_DEBUG("%s message: Destructor ended", toLog());
}

void MediaStream::close() {
void MediaStream::syncClose() {
ELOG_DEBUG("%s message:Close called", toLog());
if (!sending_) {
return;
Expand All @@ -87,11 +85,19 @@ void MediaStream::close() {
video_sink_ = nullptr;
audio_sink_ = nullptr;
fb_sink_ = nullptr;
pipeline_initialized_ = false;
pipeline_->close();
pipeline_.reset();
connection_.reset();
ELOG_DEBUG("%s message: Close ended", toLog());
}
void MediaStream::close() {
ELOG_DEBUG("%s message: Async close called", toLog());
std::shared_ptr<MediaStream> shared_this = shared_from_this();
asyncTask([shared_this] (std::shared_ptr<MediaStream> stream) {
shared_this->syncClose();
});
}

bool MediaStream::init() {
return true;
Expand Down Expand Up @@ -151,7 +157,6 @@ void MediaStream::initializePipeline() {

pipeline_->addFront(PacketReader(this));

pipeline_->addFront(LayerDetectorHandler());
pipeline_->addFront(RtcpProcessorHandler());
pipeline_->addFront(FecReceiverHandler());
pipeline_->addFront(LayerBitrateCalculationHandler());
Expand All @@ -167,6 +172,7 @@ void MediaStream::initializePipeline() {
pipeline_->addFront(RtpRetransmissionHandler());
pipeline_->addFront(SRPacketHandler());
pipeline_->addFront(SenderBandwidthEstimationHandler());
pipeline_->addFront(LayerDetectorHandler());
pipeline_->addFront(OutgoingStatsHandler());

pipeline_->addFront(PacketWriter(this));
Expand Down Expand Up @@ -225,25 +231,28 @@ void MediaStream::onTransportData(std::shared_ptr<DataPacket> packet, Transport
} else if (transport->mediaType == VIDEO_TYPE) {
packet->type = VIDEO_PACKET;
}
auto stream_ptr = shared_from_this();

char* buf = packet->data;
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
if (!chead->isRtcp()) {
uint32_t recvSSRC = head->getSSRC();
if (isVideoSourceSSRC(recvSSRC)) {
packet->type = VIDEO_PACKET;
} else if (isAudioSourceSSRC(recvSSRC)) {
packet->type = AUDIO_PACKET;
worker_->task([stream_ptr, packet]{
if (!stream_ptr->pipeline_initialized_) {
ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog());
return;
}
}

if (!pipeline_initialized_) {
ELOG_DEBUG("%s message: Pipeline not initialized yet.", toLog());
return;
}
char* buf = packet->data;
RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
if (!chead->isRtcp()) {
uint32_t recvSSRC = head->getSSRC();
if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
packet->type = VIDEO_PACKET;
} else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
packet->type = AUDIO_PACKET;
}
}

pipeline_->read(std::move(packet));
stream_ptr->pipeline_->read(std::move(packet));
});
}

void MediaStream::read(std::shared_ptr<DataPacket> packet) {
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/MediaStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ class MediaStream: public MediaSink, public MediaSource, public FeedbackSink,
* Constructor.
* Constructs an empty MediaStream without any configuration.
*/
MediaStream(std::shared_ptr<WebRtcConnection> connection,
MediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id);
/**
* Destructor.
*/
virtual ~MediaStream();
bool init();
void close() override;
void syncClose();
bool setRemoteSdp(std::shared_ptr<SdpInfo> sdp);
bool setLocalSdp(std::shared_ptr<SdpInfo> sdp);

Expand Down
20 changes: 0 additions & 20 deletions erizo/src/erizo/OneToManyProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,43 +114,23 @@ namespace erizo {
ELOG_DEBUG("Remove subscriber %s", peer_id.c_str());
boost::mutex::scoped_lock lock(monitor_mutex_);
if (this->subscribers.find(peer_id) != subscribers.end()) {
deleteAsync(std::dynamic_pointer_cast<MediaStream>(subscribers.find(peer_id)->second));
this->subscribers.erase(peer_id);
}
}

std::future<void> OneToManyProcessor::deleteAsync(std::shared_ptr<MediaStream> stream) {
auto promise = std::make_shared<std::promise<void>>();
if (stream) {
stream->getWorker()->task([promise, stream] {
stream->close();
promise->set_value();
});
} else {
promise->set_value();
}
return promise->get_future();
}

void OneToManyProcessor::close() {
closeAll();
}

void OneToManyProcessor::closeAll() {
ELOG_DEBUG("OneToManyProcessor closeAll");
feedbackSink_ = nullptr;
if (publisher.get()) {
std::future<void> future = deleteAsync(std::dynamic_pointer_cast<MediaStream>(publisher));
future.wait();
}
publisher.reset();
boost::unique_lock<boost::mutex> lock(monitor_mutex_);
std::map<std::string, std::shared_ptr<MediaSink>>::iterator it = subscribers.begin();
while (it != subscribers.end()) {
if ((*it).second != nullptr) {
FeedbackSource* fbsource = (*it).second->getFeedbackSource();
std::future<void> future = deleteAsync(std::dynamic_pointer_cast<MediaStream>((*it).second));
future.wait();
if (fbsource != nullptr) {
fbsource->setFeedbackSink(nullptr);
}
Expand Down
1 change: 0 additions & 1 deletion erizo/src/erizo/OneToManyProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class OneToManyProcessor : public MediaSink, public FeedbackSink {
int deliverVideoData_(std::shared_ptr<DataPacket> video_packet) override;
int deliverFeedback_(std::shared_ptr<DataPacket> fb_packet) override;
int deliverEvent_(MediaEventPtr event) override;
std::future<void> deleteAsync(std::shared_ptr<MediaStream> connection);
void closeAll();
};

Expand Down
9 changes: 9 additions & 0 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,15 @@ WebRTCEvent WebRtcConnection::getCurrentState() {
}

void WebRtcConnection::write(std::shared_ptr<DataPacket> packet) {
asyncTask([packet] (std::shared_ptr<WebRtcConnection> connection) {
connection->syncWrite(packet);
});
}

void WebRtcConnection::syncWrite(std::shared_ptr<DataPacket> packet) {
if (!sending_) {
return;
}
Transport *transport = (bundle_ || packet->type == VIDEO_PACKET) ? video_transport_.get() : audio_transport_.get();
if (transport == nullptr) {
return;
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ class WebRtcConnection: public TransportListener, public LogContext,

void setMetadata(std::map<std::string, std::string> metadata);

void read(std::shared_ptr<DataPacket> packet);
void write(std::shared_ptr<DataPacket> packet);
void syncWrite(std::shared_ptr<DataPacket> packet);

void asyncTask(std::function<void(std::shared_ptr<WebRtcConnection>)> f);

Expand Down
5 changes: 3 additions & 2 deletions erizo/src/test/utils/Mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ class MockWebRtcConnection: public WebRtcConnection {

class MockMediaStream: public MediaStream {
public:
MockMediaStream(std::shared_ptr<WebRtcConnection> connection, const std::string& media_stream_id,
MockMediaStream(std::shared_ptr<Worker> worker, std::shared_ptr<WebRtcConnection> connection,
const std::string& media_stream_id,
std::vector<RtpMap> rtp_mappings) :
MediaStream(connection, media_stream_id) {
MediaStream(worker, connection, media_stream_id) {
local_sdp_ = std::make_shared<SdpInfo>(rtp_mappings);
remote_sdp_ = std::make_shared<SdpInfo>(rtp_mappings);
}
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/test/utils/Tools.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class BaseHandlerTest {
io_worker = std::make_shared<erizo::IOWorker>();
io_worker->start();
connection = std::make_shared<erizo::MockWebRtcConnection>(simulated_worker, io_worker, ice_config, rtp_maps);
media_stream = std::make_shared<erizo::MockMediaStream>(connection, "", rtp_maps);
media_stream = std::make_shared<erizo::MockMediaStream>(simulated_worker, connection, "", rtp_maps);
processor = std::make_shared<erizo::MockRtcpProcessor>();
quality_manager = std::make_shared<erizo::MockQualityManager>();
packet_buffer_service = std::make_shared<erizo::PacketBufferService>();
Expand Down
14 changes: 10 additions & 4 deletions erizoAPI/MediaStream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,26 @@ NAN_MODULE_INIT(MediaStream::Init) {


NAN_METHOD(MediaStream::New) {
if (info.Length() < 2) {
if (info.Length() < 3) {
Nan::ThrowError("Wrong number of arguments");
}

if (info.IsConstructCall()) {
// Invoked as a constructor with 'new MediaStream()'
ThreadPool* thread_pool = Nan::ObjectWrap::Unwrap<ThreadPool>(Nan::To<v8::Object>(info[0]).ToLocalChecked());

WebRtcConnection* connection =
Nan::ObjectWrap::Unwrap<WebRtcConnection>(Nan::To<v8::Object>(info[0]).ToLocalChecked());
Nan::ObjectWrap::Unwrap<WebRtcConnection>(Nan::To<v8::Object>(info[1]).ToLocalChecked());

std::shared_ptr<erizo::WebRtcConnection> wrtc = connection->me;

v8::String::Utf8Value paramId(Nan::To<v8::String>(info[1]).ToLocalChecked());
v8::String::Utf8Value paramId(Nan::To<v8::String>(info[2]).ToLocalChecked());
std::string wrtcId = std::string(*paramId);

std::shared_ptr<erizo::Worker> worker = thread_pool->me->getLessUsedWorker();

MediaStream* obj = new MediaStream();
obj->me = std::make_shared<erizo::MediaStream>(wrtc, wrtcId);
obj->me = std::make_shared<erizo::MediaStream>(worker, wrtc, wrtcId);
obj->msink = obj->me.get();
uv_async_init(uv_default_loop(), &obj->asyncStats_, &MediaStream::statsCallback);
obj->Wrap(info.This());
Expand All @@ -110,6 +115,7 @@ NAN_METHOD(MediaStream::close) {
if (!uv_is_closing(reinterpret_cast<uv_handle_t*>(&obj->asyncStats_))) {
uv_close(reinterpret_cast<uv_handle_t*>(&obj->asyncStats_), NULL);
}
obj->me->close();
obj->me.reset();
}

Expand Down
8 changes: 6 additions & 2 deletions erizo_controller/erizoJS/erizoJSController.js
Original file line number Diff line number Diff line change
Expand Up @@ -411,11 +411,15 @@ exports.ErizoJSController = function (threadPool, ioThreadPool) {
log.debug('message: clearing periodic PLIs for publisher, id: ' + from);
clearInterval (publisher.wrtc.mediaStream.periodicPlis);
}
for (var key in publisher.subscribers) {
var subscriber = publisher.getSubscriber(key);
for (let subscriberKey in publisher.subscribers) {
let subscriber = publisher.getSubscriber(subscriberKey);
log.info('message: Removing subscriber, id: ' + subscriber.wrtcId);
closeWebRtcConnection(subscriber);
}
for (let externalOutputKey in publisher.externalOutputs) {
log.info('message: Removing externalOutput, id ' + externalOutputKey);
publisher.removeExternalOutput(externalOutputKey);
}
closeWebRtcConnection(publisher.wrtc);
publisher.muxer.close(function(message) {
log.info('message: muxer closed succesfully, ' +
Expand Down
10 changes: 5 additions & 5 deletions erizo_controller/erizoJS/models/Publisher.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ function createWrtc(id, threadPool, ioThreadPool, mediaConfiguration) {
return wrtc;
}

function createMediaStream (wrtc, id) {
var mediaStream = new addon.MediaStream(wrtc, id);
function createMediaStream (threadPool, wrtc, id) {
var mediaStream = new addon.MediaStream(threadPool, wrtc, id);
return mediaStream;
}

Expand Down Expand Up @@ -53,7 +53,7 @@ class Source {
var wrtc = createWrtc(wrtcId, this.threadPool, this.ioThreadPool, options.mediaConfiguration);
wrtc.wrtcId = wrtcId;
wrtc.mediaConfiguration = options.mediaConfiguration;
wrtc.mediaStream = createMediaStream(wrtc, id);
wrtc.mediaStream = createMediaStream(this.threadPool, wrtc, id);
wrtc.addMediaStream(wrtc.mediaStream);
this.subscribers[id] = wrtc;
this.muxer.addSubscriber(wrtc.mediaStream, id);
Expand Down Expand Up @@ -179,7 +179,7 @@ class Publisher extends Source {
this.wrtc.wrtcId = id;
this.wrtc.mediaConfiguration = options.mediaConfiguration;

this.wrtc.mediaStream = createMediaStream(this.wrtc, this.wrtc.wrtcId);
this.wrtc.mediaStream = createMediaStream(this.threadPool, this.wrtc, this.wrtc.wrtcId);
this.wrtc.addMediaStream(this.wrtc.mediaStream);

this.minVideoBW = options.minVideoBW;
Expand All @@ -198,7 +198,7 @@ class Publisher extends Source {
return;
}
this.wrtc = createWrtc(this.id, this.threadPool, this.ioThreadPool, this.mediaConfiguration);
this.wrtc.mediaStream = createMediaStream(this.wrtc, this.wrtc.wrtcId);
this.wrtc.mediaStream = createMediaStream(this.threadPool, this.wrtc, this.wrtc.wrtcId);

this.wrtc.mediaStream.setAudioReceiver(this.muxer);
this.wrtc.mediaStream.setVideoReceiver(this.muxer);
Expand Down
2 changes: 1 addition & 1 deletion spine/nativeClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ exports.ErizoNativeConnection = (config) => {
global.config.erizo.turnpass,
global.config.erizo.networkinterface);

mediaStream = new addon.MediaStream(wrtc,
mediaStream = new addon.MediaStream(threadPool, wrtc,
`spine_${configuration.sessionId}`,
JSON.stringify(global.mediaConfig));

Expand Down

0 comments on commit 716a942

Please sign in to comment.