Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTC: Fix rtc to rtmp sync timestamp using sender report. #2470

Merged
merged 12 commits into from
Aug 16, 2021
12 changes: 6 additions & 6 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1463,10 +1463,10 @@ srs_error_t SrsRtcPublishStream::on_rtcp_sr(SrsRtcpSR* rtcp)
srs_error_t err = srs_success;
SrsNtp srs_ntp = SrsNtp::to_time_ms(rtcp->get_ntp());

srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u",
rtcp->get_ssrc(), rtcp->get_rtp_ts(), rtcp->get_rtp_send_packets(), rtcp->get_rtp_send_bytes());
srs_verbose("sender report, ssrc_of_sender=%u, rtp_time=%u, sender_packet_count=%u, sender_octec_count=%u, ms=%u",
rtcp->get_ssrc(), rtcp->get_rtp_ts(), rtcp->get_rtp_send_packets(), rtcp->get_rtp_send_bytes(), srs_ntp.system_ms_);

update_send_report_time(rtcp->get_ssrc(), srs_ntp);
update_send_report_time(rtcp->get_ssrc(), srs_ntp, rtcp->get_rtp_ts());

return err;
}
Expand Down Expand Up @@ -1606,16 +1606,16 @@ void SrsRtcPublishStream::update_rtt(uint32_t ssrc, int rtt)
}
}

void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& ntp)
void SrsRtcPublishStream::update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time)
{
SrsRtcVideoRecvTrack* video_track = get_video_track(ssrc);
if (video_track) {
return video_track->update_send_report_time(ntp);
return video_track->update_send_report_time(ntp, rtp_time);
}

SrsRtcAudioRecvTrack* audio_track = get_audio_track(ssrc);
if (audio_track) {
return audio_track->update_send_report_time(ntp);
return audio_track->update_send_report_time(ntp, rtp_time);
}
}

Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class SrsRtcPublishStream : public ISrsRtspPacketDecodeHandler
SrsRtcAudioRecvTrack* get_audio_track(uint32_t ssrc);
SrsRtcVideoRecvTrack* get_video_track(uint32_t ssrc);
void update_rtt(uint32_t ssrc, int rtt);
void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp);
void update_send_report_time(uint32_t ssrc, const SrsNtp& ntp, uint32_t rtp_time);
};

// Callback for RTC connection.
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/app/srs_app_rtc_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class SrsRtpRingBuffer;
// \___(no received, in nack list)
// * seq1: The packet is done, we have already got and processed it.
// * seq2,seq3,...,seq10,seq12,seq13: Theses packets are in queue and wait to be processed.
// * seq10: This packet is lost or not received, we will put it in the nack list.
// * seq11: This packet is lost or not received, we will put it in the nack list.
// We store the received packets in ring buffer.
class SrsRtpRingBuffer
{
Expand Down
80 changes: 61 additions & 19 deletions trunk/src/app/srs_app_rtc_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,8 +1301,14 @@ srs_error_t SrsRtmpFromRtcBridger::on_rtp(SrsRtpPacket *pkt)
return err;
}

// Have no received any sender report, can't calculate avsync_time,
// discard it to avoid timestamp problem in live source
if (pkt->get_avsync_time() <= 0) {
return err;
}

if (pkt->is_audio()) {
err = trancode_audio(pkt);
err = transcode_audio(pkt);
} else {
err = packet_video(pkt);
}
Expand All @@ -1316,12 +1322,12 @@ void SrsRtmpFromRtcBridger::on_unpublish()
source_->on_unpublish();
}

srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket *pkt)
srs_error_t SrsRtmpFromRtcBridger::transcode_audio(SrsRtpPacket *pkt)
{
srs_error_t err = srs_success;

// to common message.
uint32_t ts = pkt->header.get_timestamp()/(48000/1000);
uint32_t ts = pkt->get_avsync_time();
if (is_first_audio) {
int header_len = 0;
uint8_t* header = NULL;
Expand Down Expand Up @@ -1352,7 +1358,7 @@ srs_error_t SrsRtmpFromRtcBridger::trancode_audio(SrsRtpPacket *pkt)

for (std::vector<SrsAudioFrame *>::iterator it = out_pkts.begin(); it != out_pkts.end(); ++it) {
SrsCommonMessage out_rtmp;
out_rtmp.header.timestamp = (*it)->dts*(48000/1000);
out_rtmp.header.timestamp = (*it)->dts;
packet_aac(&out_rtmp, (*it)->samples[0].bytes, (*it)->samples[0].size, ts, is_first_audio);

if ((err = source_->on_audio(&out_rtmp)) != srs_success) {
Expand Down Expand Up @@ -1398,7 +1404,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video(SrsRtpPacket* src)
cache_video_pkts_[index].in_use = true;
cache_video_pkts_[index].pkt = pkt;
cache_video_pkts_[index].sn = pkt->header.get_sequence();
cache_video_pkts_[index].ts = pkt->header.get_timestamp();
cache_video_pkts_[index].ts = pkt->get_avsync_time();

// check whether to recovery lost packet and can construct a video frame
if (lost_sn_ == pkt->header.get_sequence()) {
Expand Down Expand Up @@ -1435,7 +1441,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket* pkt)
//type_codec1 + avc_type + composition time + fix header + count of sps + len of sps + sps + count of pps + len of pps + pps
int nb_payload = 1 + 1 + 3 + 5 + 1 + 2 + sps->size + 1 + 2 + pps->size;
SrsCommonMessage rtmp;
rtmp.header.initialize_video(nb_payload, pkt->header.get_timestamp() / 90, 1);
rtmp.header.initialize_video(nb_payload, pkt->get_avsync_time(), 1);
rtmp.create_payload(nb_payload);
rtmp.size = nb_payload;
SrsBuffer payload(rtmp.payload, rtmp.size);
Expand Down Expand Up @@ -1463,18 +1469,18 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket* pkt)
}

if (-1 == key_frame_ts_) {
key_frame_ts_ = pkt->header.get_timestamp();
key_frame_ts_ = pkt->get_avsync_time();
header_sn_ = pkt->header.get_sequence();
lost_sn_ = header_sn_ + 1;
// Received key frame and clean cache of old p frame pkts
clear_cached_video();
srs_trace("set ts=%lld, header=%hu, lost=%hu", key_frame_ts_, header_sn_, lost_sn_);
} else if (key_frame_ts_ != pkt->header.get_timestamp()) {
} else if (key_frame_ts_ != pkt->get_avsync_time()) {
//new key frame, clean cache
int64_t old_ts = key_frame_ts_;
uint16_t old_header_sn = header_sn_;
uint16_t old_lost_sn = lost_sn_;
key_frame_ts_ = pkt->header.get_timestamp();
key_frame_ts_ = pkt->get_avsync_time();
header_sn_ = pkt->header.get_sequence();
lost_sn_ = header_sn_ + 1;
clear_cached_video();
Expand All @@ -1486,7 +1492,7 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_key_frame(SrsRtpPacket* pkt)
cache_video_pkts_[index].in_use = true;
cache_video_pkts_[index].pkt = pkt;
cache_video_pkts_[index].sn = pkt->header.get_sequence();
cache_video_pkts_[index].ts = pkt->header.get_timestamp();
cache_video_pkts_[index].ts = pkt->get_avsync_time();

int32_t sn = lost_sn_;
uint16_t tail_sn = 0;
Expand Down Expand Up @@ -1561,12 +1567,12 @@ srs_error_t SrsRtmpFromRtcBridger::packet_video_rtmp(const uint16_t start, const
nb_payload += 1 + 1 + 3;

SrsCommonMessage rtmp;
SrsRtpPacket* header = cache_video_pkts_[cache_index(start)].pkt;
rtmp.header.initialize_video(nb_payload, header->header.get_timestamp() / 90, 1);
SrsRtpPacket* pkt = cache_video_pkts_[cache_index(start)].pkt;
rtmp.header.initialize_video(nb_payload, pkt->get_avsync_time(), 1);
rtmp.create_payload(nb_payload);
rtmp.size = nb_payload;
SrsBuffer payload(rtmp.payload, rtmp.size);
if (header->is_keyframe()) {
if (pkt->is_keyframe()) {
payload.write_1bytes(0x17); // type(4 bits): key frame; code(4bits): avc
key_frame_ts_ = -1;
} else {
Expand Down Expand Up @@ -2205,7 +2211,9 @@ SrsRtcRecvTrack::SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescripti
nack_receiver_ = new SrsRtpNackForReceiver(rtp_queue_, 1000 * 2 / 3);
}

last_sender_report_sys_time = 0;
last_sender_report_rtp_time_ = 0;
last_sender_report_rtp_time1_ = 0;
last_sender_report_sys_time_ = 0;
}

SrsRtcRecvTrack::~SrsRtcRecvTrack()
Expand All @@ -2230,20 +2238,50 @@ void SrsRtcRecvTrack::update_rtt(int rtt)
nack_receiver_->update_rtt(rtt);
}

void SrsRtcRecvTrack::update_send_report_time(const SrsNtp& ntp)
void SrsRtcRecvTrack::update_send_report_time(const SrsNtp& ntp, uint32_t rtp_time)
{
last_sender_report_ntp = ntp;
last_sender_report_ntp1_ = last_sender_report_ntp_;
last_sender_report_rtp_time1_ = last_sender_report_rtp_time_;

last_sender_report_ntp_ = ntp;
last_sender_report_rtp_time_ = rtp_time;

// TODO: FIXME: Use system wall clock.
last_sender_report_sys_time = srs_update_system_time();;
last_sender_report_sys_time_ = srs_update_system_time();
}

int64_t SrsRtcRecvTrack::cal_avsync_time(uint32_t rtp_time)
{
if (last_sender_report_rtp_time1_ <= 0) {
return -1;
}

// WebRTC using sender report to sync audio/video timestamp, because audio video have different timebase,
// typical audio opus is 48000Hz, video is 90000Hz.
// We using two sender report point to calculate avsync timestamp(clock time) with any given rtp timestamp.
// For example, there are two history sender report of audio as below.
// sender_report1: rtp_time1 = 10000, ntp_time1 = 40000
// sender_report : rtp_time = 10960, ntp_time = 40020
// (rtp_time - rtp_time1) / (ntp_time - ntp_time1) = 960 / 20 = 48,
// Now we can calcualte ntp time(ntp_x) of any given rtp timestamp(rtp_x),
// (rtp_x - rtp_time) / (ntp_x - ntp_time) = 48, and then ntp_x = (rtp_x - rtp_time) / 48 + ntp_time;
float sys_time_elapsed = last_sender_report_ntp_.system_ms_ - last_sender_report_ntp1_.system_ms_;
float rtp_time_elpased = last_sender_report_rtp_time_ - last_sender_report_rtp_time1_;
float rate = rtp_time_elpased / sys_time_elapsed;

float delta = (rtp_time - last_sender_report_rtp_time_) / rate;
int64_t avsync_time = (uint32_t)delta + last_sender_report_ntp_.system_ms_;

return avsync_time;
}

srs_error_t SrsRtcRecvTrack::send_rtcp_rr()
{
srs_error_t err = srs_success;

uint32_t ssrc = track_desc_->ssrc_;
const uint64_t& last_time = last_sender_report_sys_time;
if ((err = session_->send_rtcp_rr(ssrc, rtp_queue_, last_time, last_sender_report_ntp)) != srs_success) {
const uint64_t& last_time = last_sender_report_sys_time_;
if ((err = session_->send_rtcp_rr(ssrc, rtp_queue_, last_time, last_sender_report_ntp_)) != srs_success) {
return srs_error_wrap(err, "ssrc=%u, last_time=%" PRId64, ssrc, last_time);
}

Expand Down Expand Up @@ -2348,6 +2386,8 @@ srs_error_t SrsRtcAudioRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt
{
srs_error_t err = srs_success;

pkt->set_avsync_time(cal_avsync_time(pkt->header.get_timestamp()));

if ((err = source->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "source on rtp");
}
Expand Down Expand Up @@ -2406,6 +2446,8 @@ srs_error_t SrsRtcVideoRecvTrack::on_rtp(SrsRtcSource* source, SrsRtpPacket* pkt

pkt->frame_type = SrsFrameTypeVideo;

pkt->set_avsync_time(cal_avsync_time(pkt->header.get_timestamp()));

if ((err = source->on_rtp(pkt)) != srs_success) {
return srs_error_wrap(err, "source on rtp");
}
Expand Down
17 changes: 12 additions & 5 deletions trunk/src/app/srs_app_rtc_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class SrsRtmpFromRtcBridger : public ISrsRtcSourceBridger
virtual srs_error_t on_rtp(SrsRtpPacket *pkt);
virtual void on_unpublish();
private:
srs_error_t trancode_audio(SrsRtpPacket *pkt);
srs_error_t transcode_audio(SrsRtpPacket *pkt);
void packet_aac(SrsCommonMessage* audio, char* data, int len, uint32_t pts, bool is_header);
srs_error_t packet_video(SrsRtpPacket* pkt);
srs_error_t packet_video_key_frame(SrsRtpPacket* pkt);
Expand Down Expand Up @@ -519,9 +519,15 @@ class SrsRtcRecvTrack
// By config, whether no copy.
bool nack_no_copy_;
protected:
// send report ntp and received time.
SrsNtp last_sender_report_ntp;
uint64_t last_sender_report_sys_time;
// Latest sender report ntp and rtp time.
SrsNtp last_sender_report_ntp_;
int64_t last_sender_report_rtp_time_;

// Prev sender report ntp and rtp time.
SrsNtp last_sender_report_ntp1_;
int64_t last_sender_report_rtp_time1_;

uint64_t last_sender_report_sys_time_;
public:
SrsRtcRecvTrack(SrsRtcConnection* session, SrsRtcTrackDescription* stream_descs, bool is_audio);
virtual ~SrsRtcRecvTrack();
Expand All @@ -531,7 +537,8 @@ class SrsRtcRecvTrack
bool has_ssrc(uint32_t ssrc);
uint32_t get_ssrc();
void update_rtt(int rtt);
void update_send_report_time(const SrsNtp& ntp);
void update_send_report_time(const SrsNtp& ntp, uint32_t rtp_time);
int64_t cal_avsync_time(uint32_t rtp_time);
srs_error_t send_rtcp_rr();
srs_error_t send_rtcp_xr_rrtr();
bool set_track_status(bool active);
Expand Down
3 changes: 3 additions & 0 deletions trunk/src/kernel/srs_kernel_rtc_rtp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ SrsRtpPacket::SrsRtpPacket()
frame_type = SrsFrameTypeReserved;
cached_payload_size = 0;
decode_handler = NULL;
avsync_time_ = -1;

++_srs_pps_objs_rtps->sugar;
}
Expand Down Expand Up @@ -835,6 +836,8 @@ SrsRtpPacket* SrsRtpPacket::copy()
// For performance issue, do not copy the unused field.
cp->decode_handler = decode_handler;

cp->avsync_time_ = avsync_time_;

return cp;
}

Expand Down
4 changes: 4 additions & 0 deletions trunk/src/kernel/srs_kernel_rtc_rtp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ class SrsRtpPacket
int cached_payload_size;
// The helper handler for decoder, use RAW payload if NULL.
ISrsRtspPacketDecodeHandler* decode_handler;
private:
int64_t avsync_time_;
public:
SrsRtpPacket();
virtual ~SrsRtpPacket();
Expand Down Expand Up @@ -329,6 +331,8 @@ class SrsRtpPacket
virtual srs_error_t decode(SrsBuffer* buf);
public:
bool is_keyframe();
void set_avsync_time(int64_t avsync_time) { avsync_time_ = avsync_time; }
int64_t get_avsync_time() const { return avsync_time_; }
};

// Single payload data.
Expand Down