From d6c16a7e236e03eba754c763e865464ec82d4516 Mon Sep 17 00:00:00 2001 From: winlin Date: Wed, 24 Mar 2021 20:01:02 +0800 Subject: [PATCH] RTC: Support WebRTC re-publish stream. 4.0.87 --- README.md | 1 + trunk/src/app/srs_app_rtc_conn.cpp | 27 +++++++++++++++ trunk/src/app/srs_app_rtc_conn.hpp | 9 +++-- trunk/src/app/srs_app_rtc_source.cpp | 49 +++++++++++++++++++++------- trunk/src/app/srs_app_rtc_source.hpp | 25 ++++++++++++-- trunk/src/core/srs_core_version4.hpp | 2 +- 6 files changed, 94 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 89646ae613..7875a5b096 100755 --- a/README.md +++ b/README.md @@ -190,6 +190,7 @@ Other documents: ## V4 changes +* v4.0, 2021-03-24, RTC: Support WebRTC re-publish stream. 4.0.87 * v4.0, 2021-03-24, RTC: Use fast parse TWCCID, ignore in packet parsing. 4.0.86 * v4.0, 2021-03-09, DTLS: Fix ARQ bug, use openssl timeout. 4.0.84 * v4.0, 2021-03-08, DTLS: Fix dead loop by duplicated Alert message. 4.0.83 diff --git a/trunk/src/app/srs_app_rtc_conn.cpp b/trunk/src/app/srs_app_rtc_conn.cpp index 5dceaabb8e..e28ab2f427 100644 --- a/trunk/src/app/srs_app_rtc_conn.cpp +++ b/trunk/src/app/srs_app_rtc_conn.cpp @@ -466,6 +466,30 @@ srs_error_t SrsRtcPlayStream::initialize(SrsRequest* req, std::mapaudio_track_desc_ && audio_tracks_.size() == 1) { + uint32_t ssrc = desc->audio_track_desc_->ssrc_; + SrsRtcAudioSendTrack* track = audio_tracks_.begin()->second; + + audio_tracks_.clear(); + audio_tracks_.insert(make_pair(ssrc, track)); + } + + // Refresh the relation for video. + // TODO: FIMXE: Match by label? + if (desc->video_track_descs_.size() == 1 && desc->video_track_descs_.size() == 1) { + SrsRtcTrackDescription* vdesc = desc->video_track_descs_.at(0); + uint32_t ssrc = vdesc->ssrc_; + SrsRtcVideoSendTrack* track = video_tracks_.begin()->second; + + video_tracks_.clear(); + video_tracks_.insert(make_pair(ssrc, track)); + } +} + srs_error_t SrsRtcPlayStream::on_reload_vhost_play(string vhost) { if (req_->vhost != vhost) { @@ -546,6 +570,9 @@ srs_error_t SrsRtcPlayStream::cycle() return srs_error_wrap(err, "create consumer, source=%s", req_->get_stream_url().c_str()); } + srs_assert(consumer); + consumer->set_handler(this); + // TODO: FIXME: Dumps the SPS/PPS from gop cache, without other frames. if ((err = source->consumer_dumps(consumer)) != srs_success) { return srs_error_wrap(err, "dumps consumer, url=%s", req_->get_stream_url().c_str()); diff --git a/trunk/src/app/srs_app_rtc_conn.hpp b/trunk/src/app/srs_app_rtc_conn.hpp index 767d6b0dd3..12187fae2a 100644 --- a/trunk/src/app/srs_app_rtc_conn.hpp +++ b/trunk/src/app/srs_app_rtc_conn.hpp @@ -211,7 +211,7 @@ class SrsRtcPLIWorker : virtual public ISrsCoroutineHandler // A RTC play stream, client pull and play stream from SRS. class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISrsReloadHandler - , virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler + , virtual public ISrsHourGlass, virtual public ISrsRtcPLIWorkerHandler, public ISrsRtcStreamChangeCallback { private: SrsContextId cid_; @@ -235,13 +235,16 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr bool nack_enabled_; bool nack_no_copy_; private: - // Whether palyer started. + // Whether player started. bool is_started; public: SrsRtcPlayStream(SrsRtcConnection* s, const SrsContextId& cid); virtual ~SrsRtcPlayStream(); public: srs_error_t initialize(SrsRequest* request, std::map sub_relations); +// Interface ISrsRtcStreamChangeCallback +public: + void on_stream_change(SrsRtcStreamDescription* desc); // interface ISrsReloadHandler public: virtual srs_error_t on_reload_vhost_play(std::string vhost); @@ -268,7 +271,7 @@ class SrsRtcPlayStream : virtual public ISrsCoroutineHandler, virtual public ISr srs_error_t on_rtcp_ps_feedback(SrsRtcpPsfbCommon* rtcp); srs_error_t on_rtcp_rr(SrsRtcpRR* rtcp); uint32_t get_video_publish_ssrc(uint32_t play_ssrc); -// inteface ISrsRtcPLIWorkerHandler +// Interface ISrsRtcPLIWorkerHandler public: virtual srs_error_t do_request_keyframe(uint32_t ssrc, SrsContextId cid); }; diff --git a/trunk/src/app/srs_app_rtc_source.cpp b/trunk/src/app/srs_app_rtc_source.cpp index d111d5e577..e9a34533a8 100644 --- a/trunk/src/app/srs_app_rtc_source.cpp +++ b/trunk/src/app/srs_app_rtc_source.cpp @@ -152,10 +152,19 @@ SrsNtp SrsNtp::to_time_ms(uint64_t ntp) return srs_ntp; } +ISrsRtcStreamChangeCallback::ISrsRtcStreamChangeCallback() +{ +} + +ISrsRtcStreamChangeCallback::~ISrsRtcStreamChangeCallback() +{ +} + SrsRtcConsumer::SrsRtcConsumer(SrsRtcStream* s) { source = s; should_update_source_id = false; + handler_ = NULL; mw_wait = srs_cond_new(); mw_min_msgs = 0; @@ -231,6 +240,13 @@ void SrsRtcConsumer::wait(int nb_msgs) srs_cond_wait(mw_wait); } +void SrsRtcConsumer::on_stream_change(SrsRtcStreamDescription* desc) +{ + if (handler_) { + handler_->on_stream_change(desc); + } +} + SrsRtcStreamManager::SrsRtcStreamManager() { lock = NULL; @@ -354,24 +370,34 @@ void SrsRtcStream::update_auth(SrsRequest* r) req->update_auth(r); } -srs_error_t SrsRtcStream::on_source_id_changed(SrsContextId id) +srs_error_t SrsRtcStream::on_source_changed() { srs_error_t err = srs_success; - if (!_source_id.compare(id)) { - return err; - } + // Update context id if changed. + bool id_changed = false; + const SrsContextId& id = _srs_context->get_id(); + if (_source_id.compare(id)) { + id_changed = true; - if (_pre_source_id.empty()) { - _pre_source_id = id; + if (_pre_source_id.empty()) { + _pre_source_id = id; + } + _source_id = id; } - _source_id = id; - // notice all consumer + // Notify all consumers. std::vector::iterator it; for (it = consumers.begin(); it != consumers.end(); ++it) { SrsRtcConsumer* consumer = *it; - consumer->update_source_id(); + + // Notify if context id changed. + if (id_changed) { + consumer->update_source_id(); + } + + // Notify about stream description. + consumer->on_stream_change(stream_desc_); } return err; @@ -456,9 +482,8 @@ srs_error_t SrsRtcStream::on_publish() is_created_ = true; is_delivering_packets_ = true; - // whatever, the publish thread is the source or edge source, - // save its id to srouce id. - if ((err = on_source_id_changed(_srs_context->get_id())) != srs_success) { + // Notify the consumers about stream change event. + if ((err = on_source_changed()) != srs_success) { return srs_error_wrap(err, "source id change"); } diff --git a/trunk/src/app/srs_app_rtc_source.hpp b/trunk/src/app/srs_app_rtc_source.hpp index f22bffc274..1ea9c5a419 100644 --- a/trunk/src/app/srs_app_rtc_source.hpp +++ b/trunk/src/app/srs_app_rtc_source.hpp @@ -75,6 +75,17 @@ class SrsNtp static uint64_t kMagicNtpFractionalUnit; }; +// When RTC stream publish and re-publish. +class ISrsRtcStreamChangeCallback +{ +public: + ISrsRtcStreamChangeCallback(); + virtual ~ISrsRtcStreamChangeCallback(); +public: + virtual void on_stream_change(SrsRtcStreamDescription* desc) = 0; +}; + +// The RTC stream consumer, consume packets from RTC stream source. class SrsRtcConsumer { private: @@ -87,6 +98,9 @@ class SrsRtcConsumer srs_cond_t mw_wait; bool mw_waiting; int mw_min_msgs; +private: + // The callback for stream change event. + ISrsRtcStreamChangeCallback* handler_; public: SrsRtcConsumer(SrsRtcStream* s); virtual ~SrsRtcConsumer(); @@ -100,6 +114,9 @@ class SrsRtcConsumer virtual srs_error_t dump_packet(SrsRtpPacket2** ppkt); // Wait for at-least some messages incoming in queue. virtual void wait(int nb_msgs); +public: + void set_handler(ISrsRtcStreamChangeCallback* h) { handler_ = h; } // SrsRtcConsumer::set_handler() + void on_stream_change(SrsRtcStreamDescription* desc); }; class SrsRtcStreamManager @@ -154,7 +171,7 @@ class SrsRtcStream // For publish, it's the publish client id. // For edge, it's the edge ingest id. // when source id changed, for example, the edge reconnect, - // invoke the on_source_id_changed() to let all clients know. + // invoke the on_source_changed() to let all clients know. SrsContextId _source_id; // previous source id. SrsContextId _pre_source_id; @@ -180,8 +197,10 @@ class SrsRtcStream virtual srs_error_t initialize(SrsRequest* r); // Update the authentication information in request. virtual void update_auth(SrsRequest* r); - // The source id changed. - virtual srs_error_t on_source_id_changed(SrsContextId id); +private: + // The stream source changed. + virtual srs_error_t on_source_changed(); +public: // Get current source id. virtual SrsContextId source_id(); virtual SrsContextId pre_source_id(); diff --git a/trunk/src/core/srs_core_version4.hpp b/trunk/src/core/srs_core_version4.hpp index ef82d18866..2add34d5bd 100644 --- a/trunk/src/core/srs_core_version4.hpp +++ b/trunk/src/core/srs_core_version4.hpp @@ -26,6 +26,6 @@ #define VERSION_MAJOR 4 #define VERSION_MINOR 0 -#define VERSION_REVISION 86 +#define VERSION_REVISION 87 #endif