From 7b9c52b28357b49028eca82127d9d9c1855d5dd9 Mon Sep 17 00:00:00 2001 From: Winlin Date: Thu, 13 Jun 2024 14:44:09 +0800 Subject: [PATCH] SmartPtr: Support shared ptr for SRT source. (#4084) --- Co-authored-by: Haibo Chen <495810242@qq.com> --- trunk/src/app/srs_app_conn.hpp | 2 +- trunk/src/app/srs_app_rtc_conn.cpp | 4 +-- trunk/src/app/srs_app_rtmp_conn.cpp | 4 +-- trunk/src/app/srs_app_source.cpp | 1 + trunk/src/app/srs_app_srt_conn.cpp | 5 ++- trunk/src/app/srs_app_srt_conn.hpp | 2 +- trunk/src/app/srs_app_srt_source.cpp | 48 ++++++++++++++++------------ trunk/src/app/srs_app_srt_source.hpp | 10 +++--- trunk/src/core/srs_core_autofree.hpp | 2 +- 9 files changed, 42 insertions(+), 36 deletions(-) diff --git a/trunk/src/app/srs_app_conn.hpp b/trunk/src/app/srs_app_conn.hpp index c0e965525f..f9fe28a89c 100644 --- a/trunk/src/app/srs_app_conn.hpp +++ b/trunk/src/app/srs_app_conn.hpp @@ -148,7 +148,7 @@ class SrsSharedResource : public ISrsResource private: SrsSharedPtr ptr_; public: - SrsSharedResource(T* ptr) : ptr_(ptr) { + SrsSharedResource(T* ptr = NULL) : ptr_(ptr) { } SrsSharedResource(const SrsSharedResource& cp) : ptr_(cp.ptr_) { } diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index b243c75f85..243dba6ef9 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -1216,11 +1216,11 @@ srs_error_t SrsRtcPublishStream::initialize(SrsRequest* r, SrsRtcSourceDescripti // Check whether SRT stream is busy. #ifdef SRS_SRT - SrsSrtSource* srt = NULL; bool srt_server_enabled = _srs_config->get_srt_enabled(); bool srt_enabled = _srs_config->get_srt_enabled(r->vhost); if (srt_server_enabled && srt_enabled) { - if ((err = _srs_srt_sources->fetch_or_create(r, &srt)) != srs_success) { + SrsSharedPtr srt; + if ((err = _srs_srt_sources->fetch_or_create(r, srt)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp index 9478e3358c..6e824c3b0f 100644 --- a/trunk/src/app/srs_app_rtmp_conn.cpp +++ b/trunk/src/app/srs_app_rtmp_conn.cpp @@ -1102,11 +1102,11 @@ srs_error_t SrsRtmpConn::acquire_publish(SrsLiveSource* source) // Check whether SRT stream is busy. #ifdef SRS_SRT - SrsSrtSource* srt = NULL; bool srt_server_enabled = _srs_config->get_srt_enabled(); bool srt_enabled = _srs_config->get_srt_enabled(req->vhost); if (srt_server_enabled && srt_enabled && !info->edge) { - if ((err = _srs_srt_sources->fetch_or_create(req, &srt)) != srs_success) { + SrsSharedPtr srt; + if ((err = _srs_srt_sources->fetch_or_create(req, srt)) != srs_success) { return srs_error_wrap(err, "create source"); } diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 86128d22ed..7e6368d727 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1853,6 +1853,7 @@ srs_error_t SrsLiveSourceManager::notify(int event, srs_utime_t interval, srs_ut return srs_error_wrap(err, "source=%s/%s cycle", source->source_id().c_str(), source->pre_source_id().c_str()); } + // See SrsSrtSource::on_consumer_destroy // TODO: FIXME: support source cleanup. // @see https://github.com/ossrs/srs/issues/713 #if 0 diff --git a/trunk/src/app/srs_app_srt_conn.cpp b/trunk/src/app/srs_app_srt_conn.cpp index 8306bc34af..87897dd1e9 100644 --- a/trunk/src/app/srs_app_srt_conn.cpp +++ b/trunk/src/app/srs_app_srt_conn.cpp @@ -152,7 +152,7 @@ srs_error_t SrsSrtRecvThread::get_recv_err() return srs_error_copy(recv_err_); } -SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port) +SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, std::string ip, int port) : srt_source_(new SrsSrtSource()) { // Create a identify for this client. _srs_context->set_id(_srs_context->generate_id()); @@ -171,7 +171,6 @@ SrsMpegtsSrtConn::SrsMpegtsSrtConn(SrsSrtServer* srt_server, srs_srt_t srt_fd, s trd_ = new SrsSTCoroutine("ts-srt", this, _srs_context->get_id()); - srt_source_ = NULL; req_ = new SrsRequest(); req_->ip = ip; @@ -285,7 +284,7 @@ srs_error_t SrsMpegtsSrtConn::do_cycle() srs_trace("@srt, streamid=%s, stream_url=%s, vhost=%s, app=%s, stream=%s, param=%s", streamid.c_str(), req_->get_stream_url().c_str(), req_->vhost.c_str(), req_->app.c_str(), req_->stream.c_str(), req_->param.c_str()); - if ((err = _srs_srt_sources->fetch_or_create(req_, &srt_source_)) != srs_success) { + if ((err = _srs_srt_sources->fetch_or_create(req_, srt_source_)) != srs_success) { return srs_error_wrap(err, "fetch srt source"); } diff --git a/trunk/src/app/srs_app_srt_conn.hpp b/trunk/src/app/srs_app_srt_conn.hpp index 71833985f1..bc3aab970b 100644 --- a/trunk/src/app/srs_app_srt_conn.hpp +++ b/trunk/src/app/srs_app_srt_conn.hpp @@ -123,7 +123,7 @@ class SrsMpegtsSrtConn : public ISrsConnection, public ISrsStartable, public ISr SrsCoroutine* trd_; SrsRequest* req_; - SrsSrtSource* srt_source_; + SrsSharedPtr srt_source_; SrsSecurity* security_; }; diff --git a/trunk/src/app/srs_app_srt_source.cpp b/trunk/src/app/srs_app_srt_source.cpp index 032bda5cc9..ea915dfb22 100644 --- a/trunk/src/app/srs_app_srt_source.cpp +++ b/trunk/src/app/srs_app_srt_source.cpp @@ -102,7 +102,7 @@ SrsSrtSourceManager::~SrsSrtSourceManager() srs_mutex_destroy(lock); } -srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** pps) +srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSharedPtr& pps) { srs_error_t err = srs_success; @@ -110,48 +110,44 @@ srs_error_t SrsSrtSourceManager::fetch_or_create(SrsRequest* r, SrsSrtSource** p // @bug https://github.com/ossrs/srs/issues/1230 SrsLocker(lock); - SrsSrtSource* source = NULL; - if ((source = fetch(r)) != NULL) { + string stream_url = r->get_stream_url(); + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + if (it != pool.end()) { + SrsSharedPtr source = it->second; + // we always update the request of resource, // for origin auth is on, the token in request maybe invalid, // and we only need to update the token of request, it's simple. source->update_auth(r); - *pps = source; + pps = source; + return err; } - string stream_url = r->get_stream_url(); - string vhost = r->vhost; - - // should always not exists for create a source. - srs_assert (pool.find(stream_url) == pool.end()); - + SrsSharedPtr source(new SrsSrtSource()); srs_trace("new srt source, stream_url=%s", stream_url.c_str()); - source = new SrsSrtSource(); if ((err = source->initialize(r)) != srs_success) { return srs_error_wrap(err, "init source %s", r->get_stream_url().c_str()); } pool[stream_url] = source; - - *pps = source; + pps = source; return err; } -SrsSrtSource* SrsSrtSourceManager::fetch(SrsRequest* r) +void SrsSrtSourceManager::eliminate(SrsRequest* r) { - SrsSrtSource* source = NULL; + // Use lock to protect coroutine switch. + // @bug https://github.com/ossrs/srs/issues/1230 + SrsLocker(lock); string stream_url = r->get_stream_url(); - if (pool.find(stream_url) == pool.end()) { - return NULL; + std::map< std::string, SrsSharedPtr >::iterator it = pool.find(stream_url); + if (it != pool.end()) { + pool.erase(it); } - - source = pool[stream_url]; - - return source; } SrsSrtSourceManager* _srs_srt_sources = NULL; @@ -973,6 +969,11 @@ void SrsSrtSource::on_consumer_destroy(SrsSrtConsumer* consumer) if (it != consumers.end()) { it = consumers.erase(it); } + + // Destroy and cleanup source when no publishers and consumers. + if (can_publish_ && consumers.empty()) { + _srs_srt_sources->eliminate(req); + } } bool SrsSrtSource::can_publish() @@ -1026,6 +1027,11 @@ void SrsSrtSource::on_unpublish() bridge_->on_unpublish(); srs_freep(bridge_); } + + // Destroy and cleanup source when no publishers and consumers. + if (can_publish_ && consumers.empty()) { + _srs_srt_sources->eliminate(req); + } } srs_error_t SrsSrtSource::on_packet(SrsSrtPacket* packet) diff --git a/trunk/src/app/srs_app_srt_source.hpp b/trunk/src/app/srs_app_srt_source.hpp index 80ea4b10f0..edbfa45ae7 100644 --- a/trunk/src/app/srs_app_srt_source.hpp +++ b/trunk/src/app/srs_app_srt_source.hpp @@ -15,6 +15,7 @@ #include #include #include +#include class SrsSharedPtrMessage; class SrsRequest; @@ -50,7 +51,7 @@ class SrsSrtSourceManager { private: srs_mutex_t lock; - std::map pool; + std::map< std::string, SrsSharedPtr > pool; public: SrsSrtSourceManager(); virtual ~SrsSrtSourceManager(); @@ -58,10 +59,9 @@ class SrsSrtSourceManager // create source when fetch from cache failed. // @param r the client request. // @param pps the matched source, if success never be NULL. - virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSrtSource** pps); -public: - // Get the exists source, NULL when not exists. - virtual SrsSrtSource* fetch(SrsRequest* r); + virtual srs_error_t fetch_or_create(SrsRequest* r, SrsSharedPtr& pps); + // Dispose and destroy the source. + virtual void eliminate(SrsRequest* r); }; // Global singleton instance. diff --git a/trunk/src/core/srs_core_autofree.hpp b/trunk/src/core/srs_core_autofree.hpp index 28f7662767..a84aaeca1f 100644 --- a/trunk/src/core/srs_core_autofree.hpp +++ b/trunk/src/core/srs_core_autofree.hpp @@ -98,7 +98,7 @@ class SrsSharedPtr uint32_t* ref_count_; public: // Create a shared ptr with the object. - SrsSharedPtr(T* ptr) { + SrsSharedPtr(T* ptr = NULL) { ptr_ = ptr; ref_count_ = new uint32_t(1); }