Skip to content

Commit

Permalink
Fix WebRTCConnections not properly waiting for IceConnections to clos…
Browse files Browse the repository at this point in the history
…e. (#1765)
  • Loading branch information
lodoyun authored Oct 27, 2021
1 parent 10414fa commit 273f5ed
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 52 deletions.
24 changes: 13 additions & 11 deletions erizo/src/erizo/DtlsTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ DtlsTransport::DtlsTransport(MediaType med, const std::string &transport_name, c

DtlsTransport::~DtlsTransport() {
if (this->state_ != TRANSPORT_FINISHED) {
this->close();
ELOG_WARN("%s message: Destructor called but transport has not been properly closed", toLog());
}
}

Expand All @@ -133,7 +133,7 @@ void DtlsTransport::start() {
ice_->start();
}

void DtlsTransport::close() {
boost::future<void> DtlsTransport::close() {
ELOG_DEBUG("%s message: closing", toLog());
running_ = false;
if (rtp_timeout_checker_) {
Expand All @@ -142,15 +142,17 @@ void DtlsTransport::close() {
if (rtcp_timeout_checker_) {
rtcp_timeout_checker_->cancel();
}
ice_->close();
if (dtlsRtp) {
dtlsRtp->close();
}
if (dtlsRtcp) {
dtlsRtcp->close();
}
this->state_ = TRANSPORT_FINISHED;
ELOG_DEBUG("%s message: closed", toLog());
std::shared_ptr<DtlsTransport> shared_this = std::dynamic_pointer_cast<DtlsTransport>(shared_from_this());
return ice_->close().then([shared_this] (boost::future<void>) {
if (shared_this->dtlsRtp) {
shared_this->dtlsRtp->close();
}
if (shared_this->dtlsRtcp) {
shared_this->dtlsRtcp->close();
}
shared_this->state_ = TRANSPORT_FINISHED;
ELOG_DEBUG("%s message: closed", shared_this->toLog());
});
}

void DtlsTransport::maybeRestartIce(std::string username, std::string password) {
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/DtlsTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DtlsTransport : dtls::DtlsReceiver, public Transport {
std::string getMyFingerprint() const;
static bool isDtlsPacket(const char* buf, int len);
void start() override;
void close() override;
boost::future<void> close() override;
void maybeRestartIce(std::string username, std::string password) override;
void onIceData(packetPtr packet) override;
void onCandidate(const CandidateInfo &candidate, IceConnection *conn) override;
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/IceConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
#include <string>
#include <vector>
#include <queue>
Expand Down Expand Up @@ -102,7 +103,7 @@ class IceConnection : public LogContext {
virtual void onData(unsigned int component_id, char* buf, int len) {}
virtual void onData(unsigned int component_id, packetPtr) {}
virtual CandidatePair getSelectedPair() = 0;
virtual void close() = 0;
virtual boost::future<void> close() = 0;
virtual void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) = 0;

virtual void updateIceState(IceState state);
Expand Down
30 changes: 23 additions & 7 deletions erizo/src/erizo/LibNiceConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@ LibNiceConnection::~LibNiceConnection() {
}
}

void LibNiceConnection::close() {
if (closed_) {
return;
boost::future<void> LibNiceConnection::close() {
if (!closed_) {
auto shared_this = shared_from_this();
return asyncWithFuture([shared_this] (std::shared_ptr<LibNiceConnection> this_ptr) {
shared_this->closeSync();
});
} else {
auto task_promise = std::make_shared<boost::promise<void>>();
task_promise->set_value();
return task_promise->get_future();
}
auto shared_this = shared_from_this();
async([shared_this] (std::shared_ptr<LibNiceConnection> this_ptr) {
shared_this->closeSync();
});
}

void LibNiceConnection::closeSync() {
Expand Down Expand Up @@ -182,6 +185,19 @@ void LibNiceConnection::async(std::function<void(std::shared_ptr<LibNiceConnecti
});
}

boost::future<void> LibNiceConnection::asyncWithFuture(
std::function<void(std::shared_ptr<LibNiceConnection>)> f) {
auto task_promise = std::make_shared<boost::promise<void>>();
std::weak_ptr<LibNiceConnection> weak_this = shared_from_this();
io_worker_->task([weak_this, f, task_promise] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
task_promise->set_value();
});
return task_promise->get_future();
}

void LibNiceConnection::start() {
async([] (std::shared_ptr<LibNiceConnection> this_ptr) {
this_ptr->startSync();
Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/LibNiceConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ class LibNiceConnection : public IceConnection, public std::enable_shared_from_t
CandidatePair getSelectedPair() override;
void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) override;
void restartIceSync(std::string remote_ufrag, std::string remote_pass);
void close() override;
boost::future<void> close() override;
void async(std::function<void(std::shared_ptr<LibNiceConnection>)> f);
boost::future<void> asyncWithFuture(std::function<void(std::shared_ptr<LibNiceConnection>)> f);

static std::shared_ptr<IceConnection> create(std::shared_ptr<IOWorker> io_worker, const IceConfig& ice_config);

Expand Down
26 changes: 23 additions & 3 deletions erizo/src/erizo/NicerConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ NicerConnection::NicerConnection(std::shared_ptr<IOWorker> io_worker, std::share
}

NicerConnection::~NicerConnection() {
if (!closed_) {
ELOG_WARN("%s message: Destructor without a previous close", toLog());
}
}

void NicerConnection::async(function<void(std::shared_ptr<NicerConnection>)> f) {
void NicerConnection::async(std::function<void(std::shared_ptr<NicerConnection>)> f) {
std::weak_ptr<NicerConnection> weak_this = shared_from_this();
io_worker_->task([weak_this, f] {
if (auto this_ptr = weak_this.lock()) {
Expand All @@ -165,6 +168,19 @@ void NicerConnection::async(function<void(std::shared_ptr<NicerConnection>)> f)
});
}

boost::future<void> NicerConnection::asyncWithFuture(
std::function<void(std::shared_ptr<NicerConnection>)> f) {
auto task_promise = std::make_shared<boost::promise<void>>();
std::weak_ptr<NicerConnection> weak_this = shared_from_this();
io_worker_->task([weak_this, f, task_promise] {
if (auto this_ptr = weak_this.lock()) {
f(this_ptr);
}
task_promise->set_value();
});
return task_promise->get_future();
}

void NicerConnection::start() {
ufrag_ = getNewUfrag();
upass_ = getNewPwd();
Expand Down Expand Up @@ -604,12 +620,16 @@ void NicerConnection::closeSync() {
closed_ = true;
}

void NicerConnection::close() {
boost::future<void> NicerConnection::close() {
if (!closed_) {
auto shared_this = shared_from_this();
async([shared_this] (std::shared_ptr<NicerConnection> this_ptr) {
return asyncWithFuture([shared_this] (std::shared_ptr<NicerConnection> this_ptr) {
shared_this->closeSync();
});
} else {
auto task_promise = std::make_shared<boost::promise<void>>();
task_promise->set_value();
return task_promise->get_future();
}
}

Expand Down
3 changes: 2 additions & 1 deletion erizo/src/erizo/NicerConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class NicerConnection : public IceConnection, public std::enable_shared_from_thi

void onData(unsigned int component_id, char* buf, int len) override;
CandidatePair getSelectedPair() override;
void close() override;
boost::future<void> close() override;
bool isClosed() { return closed_; }
void maybeRestartIce(std::string remote_ufrag, std::string remote_pass) override;

Expand All @@ -76,6 +76,7 @@ class NicerConnection : public IceConnection, public std::enable_shared_from_thi
void startSync();
void closeSync();
void async(function<void(std::shared_ptr<NicerConnection>)> f);
boost::future<void> asyncWithFuture(std::function<void(std::shared_ptr<NicerConnection>)> f);
void setRemoteCredentialsSync(const std::string& username, const std::string& password);
void addStreamSync(std::string remote_ufrag, std::string remote_pass);

Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class Transport : public std::enable_shared_from_this<Transport>, public IceConn
virtual void write(char* data, int len) = 0;
virtual void processLocalSdp(SdpInfo *localSdp_) = 0;
virtual void start() = 0;
virtual void close() = 0;
virtual boost::future<void> close() = 0;
virtual std::shared_ptr<IceConnection> getIceConnection() { return ice_; }
void setTransportListener(std::weak_ptr<TransportListener> listener) {
transport_listener_ = listener;
Expand Down
13 changes: 8 additions & 5 deletions erizo/src/erizo/UnencryptedTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ UnencryptedTransport::UnencryptedTransport(MediaType med, const std::string &tra

UnencryptedTransport::~UnencryptedTransport() {
if (this->state_ != TRANSPORT_FINISHED) {
this->close();
ELOG_WARN("%s message: Destructor called but transport has not been properly closed", toLog());
}
}

Expand All @@ -51,12 +51,15 @@ void UnencryptedTransport::start() {
ice_->start();
}

void UnencryptedTransport::close() {
boost::future<void> UnencryptedTransport::close() {
std::shared_ptr<UnencryptedTransport> shared_this =
std::dynamic_pointer_cast<UnencryptedTransport>(shared_from_this());
ELOG_DEBUG("%s message: closing", toLog());
running_ = false;
ice_->close();
this->state_ = TRANSPORT_FINISHED;
ELOG_DEBUG("%s message: closed", toLog());
return ice_->close().then([shared_this] (boost::future<void>) {
shared_this->state_ = TRANSPORT_FINISHED;
ELOG_DEBUG("%s message: closed", shared_this->toLog());
});
}

void UnencryptedTransport::maybeRestartIce(std::string username, std::string password) {
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/UnencryptedTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class UnencryptedTransport : public Transport {
virtual ~UnencryptedTransport();
void connectionStateChanged(IceState newState);
void start() override;
void close() override;
boost::future<void> close() override;
void maybeRestartIce(std::string username, std::string password) override;
void onIceData(packetPtr packet) override;
void onCandidate(const CandidateInfo &candidate, IceConnection *conn) override;
Expand Down
51 changes: 34 additions & 17 deletions erizo/src/erizo/WebRtcConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,54 @@ WebRtcConnection::~WebRtcConnection() {
ELOG_DEBUG("%s message: Destructor ended", toLog());
}

void WebRtcConnection::syncClose() {
boost::future<void> WebRtcConnection::syncClose() {
ELOG_DEBUG("%s message: Close called", toLog());
auto shared_this = shared_from_this();
auto close_promise = std::make_shared<boost::promise<void>>();
if (!sending_) {
return;
ELOG_DEBUG("%s message: Already closed, returning", toLog());
close_promise->set_value();
return close_promise->get_future();
}
sending_ = false;
transceivers_.clear();
streams_.clear();
auto close_transport_futures = std::make_shared<std::vector<boost::future<void>>>();
if (video_transport_.get()) {
video_transport_->close();
close_transport_futures->push_back(video_transport_->close());
}
if (audio_transport_.get()) {
audio_transport_->close();
}
global_state_ = CONN_FINISHED;
if (conn_event_listener_ != nullptr) {
conn_event_listener_ = nullptr;
close_transport_futures->push_back(audio_transport_->close());
}
pipeline_initialized_ = false;
pipeline_->close();
pipeline_.reset();

ELOG_DEBUG("%s message: Close ended", toLog());
auto future_when_all = boost::when_all(close_transport_futures->begin(), close_transport_futures->end());
future_when_all.then([shared_this, close_promise](decltype(future_when_all)) {
shared_this->global_state_ = CONN_FINISHED;
if (shared_this->conn_event_listener_ != nullptr) {
shared_this->conn_event_listener_ = nullptr;
}
shared_this->pipeline_initialized_ = false;
shared_this->pipeline_->close();
shared_this->pipeline_.reset();
ELOG_DEBUG("%s message: Close ended", shared_this->toLog());
close_promise->set_value();
});
return close_promise->get_future();
}

boost::future<void> WebRtcConnection::close() {
ELOG_DEBUG("%s message: Async close called", toLog());
std::shared_ptr<WebRtcConnection> shared_this = shared_from_this();
return asyncTask([shared_this] (std::shared_ptr<WebRtcConnection> connection) {
shared_this->syncClose();
std::weak_ptr<WebRtcConnection> weak_this = shared_from_this();
auto task_promise = std::make_shared<boost::promise<void>>();
worker_->task([weak_this, task_promise] {
if (auto this_ptr = weak_this.lock()) {
this_ptr->syncClose().then([task_promise] (boost::future<void>) {
task_promise->set_value();
});
} else {
task_promise->set_value();
}
});
return task_promise->get_future();
}

bool WebRtcConnection::init() {
Expand Down Expand Up @@ -1154,7 +1171,7 @@ void WebRtcConnection::updateState(TransportState state, Transport * transport)
break;
case TRANSPORT_FAILED:
temp = CONN_FAILED;
sending_ = false;
close();
msg = "";
ELOG_ERROR("%s message: Transport Failed, transportType: %s", toLog(), transport->transport_name.c_str() );
break;
Expand Down
2 changes: 1 addition & 1 deletion erizo/src/erizo/WebRtcConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class WebRtcConnection: public TransportListener, public LogContext, public Hand
*/
bool init();
boost::future<void> close();
void syncClose();
boost::future<void> syncClose();

boost::future<void> setRemoteSdpInfo(std::shared_ptr<SdpInfo> sdp);

Expand Down
6 changes: 4 additions & 2 deletions erizo/src/test/utils/Mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class MockTransport: public Transport {
void updateIceState(IceState state, IceConnection *conn) override {
}
void maybeRestartIce(std::string username, std::string password) override {

}
void onIceData(packetPtr packet) override {
}
Expand All @@ -86,7 +85,10 @@ class MockTransport: public Transport {
}
void start() override {
}
void close() override {
boost::future<void> close() override {
boost::promise<void> promise;
promise.set_value();
return promise.get_future();
}
};

Expand Down

0 comments on commit 273f5ed

Please sign in to comment.