From 92c0c723a64b1b0693d54e294116eaa37f1ac569 Mon Sep 17 00:00:00 2001 From: q191201771 <191201771@qq.com> Date: Sat, 12 Mar 2022 19:12:54 +0800 Subject: [PATCH] =?UTF-8?q?[perf]=20mpegts:=20=E4=BC=98=E5=8C=96=E8=BD=AC?= =?UTF-8?q?=E6=8D=A2mpegts=E7=9A=84=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 2 + pkg/aac/aac.go | 2 +- pkg/hls/muxer.go | 9 ++- pkg/hls/var.go | 4 -- pkg/logic/group__core_streaming.go | 85 +++++++++++++---------- pkg/logic/group__in.go | 10 +-- pkg/mpegts/pack.go | 53 +++++++------- pkg/remux/rtmp2mpegts.go | 108 ++++++++++++++++------------- 8 files changed, 152 insertions(+), 121 deletions(-) diff --git a/Makefile b/Makefile index 4f40838c..0d66f37c 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,8 @@ image: .PHONY: clean clean: rm -rf ./bin ./lal_record ./logs coverage.txt + find ./pkg -name 'lal_record' | xargs rm -rf + find ./pkg -name 'logs' | xargs rm -rf .PHONY: all all: build test diff --git a/pkg/aac/aac.go b/pkg/aac/aac.go index fcfd1281..df47c64a 100644 --- a/pkg/aac/aac.go +++ b/pkg/aac/aac.go @@ -174,7 +174,7 @@ func (ascCtx *AscContext) GetSamplingFrequency() (int, error) { case AscSamplingFrequencyIndex44100: return 44100, nil } - return -1, fmt.Errorf("%w. index=%d", base.ErrSamplingFrequencyIndex, ascCtx.SamplingFrequencyIndex) + return -1, fmt.Errorf("%w. asCtx=%+v", base.ErrSamplingFrequencyIndex, ascCtx) } type AdtsHeaderContext struct { diff --git a/pkg/hls/muxer.go b/pkg/hls/muxer.go index 86dc842d..2026caeb 100644 --- a/pkg/hls/muxer.go +++ b/pkg/hls/muxer.go @@ -20,7 +20,7 @@ import ( ) type MuxerObserver interface { - FlushAudio() + OnFragmentOpen() } // MuxerConfig @@ -43,7 +43,7 @@ const ( // Muxer // -// 输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流 +// 输入mpegts流,转出hls(m3u8+ts)至文件中 // type Muxer struct { UniqueKey string @@ -122,7 +122,6 @@ func (m *Muxer) Start() { func (m *Muxer) Dispose() { Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey) - m.observer.FlushAudio() if err := m.closeFragment(true); err != nil { Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err) } @@ -132,7 +131,7 @@ func (m *Muxer) Dispose() { // OnPatPmt OnTsPackets // -// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接挂载 remux.Rtmp2MpegtsRemuxer 接收rtmp转Mpegts的数据 +// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer // func (m *Muxer) OnPatPmt(b []byte) { m.FeedPatPmt(b) @@ -293,7 +292,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error { m.fragTs = ts // nrm said: start fragment with audio to make iPhone happy - m.observer.FlushAudio() + m.observer.OnFragmentOpen() return nil } diff --git a/pkg/hls/var.go b/pkg/hls/var.go index 55bd457e..322a3d0b 100644 --- a/pkg/hls/var.go +++ b/pkg/hls/var.go @@ -20,7 +20,3 @@ var ( Log = nazalog.GetGlobalLogger() ) - -var ( - calcFragmentHeaderQueueSize = 16 -) diff --git a/pkg/logic/group__core_streaming.go b/pkg/logic/group__core_streaming.go index b4de6a8b..b17e458b 100644 --- a/pkg/logic/group__core_streaming.go +++ b/pkg/logic/group__core_streaming.go @@ -22,38 +22,30 @@ import ( ) // group__streaming.go +// // 包含group中音视频数据转发、转封装协议的逻辑 +// // --------------------------------------------------------------------------------------------------------------------- -// 输入rtmp类型的数据 -// -// OnReadRtmpAvMsg 来自 rtmp.ServerSession(Pub), rtmp.PullSession, remux.DummyAudioFilter 的回调 + +// OnReadRtmpAvMsg // -// onRtmpMsgFromRemux 来自内部协议转换 +// 输入rtmp数据. +// 来自 rtmp.ServerSession(Pub), rtmp.PullSession, (remux.DummyAudioFilter) 的回调. // -// --------------------------------------------------------------------------------------------------------------------- - -// OnReadRtmpAvMsg ... func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) { group.mutex.Lock() defer group.mutex.Unlock() group.broadcastByRtmpMsg(msg) } -func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) { - group.broadcastByRtmpMsg(msg) -} - // --------------------------------------------------------------------------------------------------------------------- -// rtp类型以及rtp组成帧之后的数据 -// -// OnSdp, OnRtpPacket, OnAvPacket 来自 rtsp.PubSession 的回调 + +// OnSdp OnRtpPacket OnAvPacket // -// onSdpFromRemux, onRtpPacketFromRemux 来自内部协议转换 +// 输入rtsp(rtp)和rtp合帧之后的数据. +// 来自 rtsp.PubSession 的回调. // -// --------------------------------------------------------------------------------------------------------------------- - -// OnSdp ... func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { group.mutex.Lock() defer group.mutex.Unlock() @@ -62,11 +54,6 @@ func (group *Group) OnSdp(sdpCtx sdp.LogicContext) { group.rtsp2RtmpRemuxer.OnSdp(sdpCtx) } -// onSdpFromRemux ... -func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) { - group.sdpCtx = &sdpCtx -} - // OnRtpPacket ... func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { group.mutex.Lock() @@ -74,11 +61,6 @@ func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) { group.feedRtpPacket(pkt) } -// onRtpPacketFromRemux ... -func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) { - group.feedRtpPacket(pkt) -} - // OnAvPacket ... func (group *Group) OnAvPacket(pkt base.AvPacket) { group.mutex.Lock() @@ -87,13 +69,12 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) { } // --------------------------------------------------------------------------------------------------------------------- -// mpegts类型的数据 + +// OnPatPmt OnTsPackets // -// OnPatPmt, OnTsPackets 来自 Rtmp2MpegtsRemuxerObserver 的回调 +// 输入mpegts数据. +// 来自 remux.Rtmp2MpegtsRemuxer 的回调. // -// --------------------------------------------------------------------------------------------------------------------- - -// OnPatPmt ... func (group *Group) OnPatPmt(b []byte) { group.patpmt = b @@ -111,8 +92,40 @@ func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary group.feedTsPackets(tsPackets, frame, boundary) } -func (group *Group) FlushAudio() { - //to be continued +// --------------------------------------------------------------------------------------------------------------------- + +// onRtmpMsgFromRemux +// +// 输入rtmp数据. +// 来自 remux.AvPacket2RtmpRemuxer 的回调. +// +func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) { + group.broadcastByRtmpMsg(msg) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// onSdpFromRemux onRtpPacketFromRemux +// +// 输入rtsp(rtp)数据. +// 来自 remux.Rtmp2RtspRemuxer 的回调. +// +func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) { + group.sdpCtx = &sdpCtx +} + +// onRtpPacketFromRemux ... +func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) { + group.feedRtpPacket(pkt) +} + +// --------------------------------------------------------------------------------------------------------------------- + +// OnFragmentOpen +// +// 来自 hls.Muxer 的回调 +// +func (group *Group) OnFragmentOpen() { group.rtmp2MpegtsRemuxer.FlushAudio() } @@ -382,7 +395,7 @@ func (group *Group) feedRtpPacket(pkt rtprtcp.RtpPacket) { // --------------------------------------------------------------------------------------------------------------------- func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) { - // TODO(chef): [opt] 重构remux 2 ts后,hls的输入必须放在http ts的输入之前,保证hls重新切片时可以先flush audio + // 注意,hls的处理放在前面,让hls先判断是否打开新的fragment并flush audio if group.hlsMuxer != nil { group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary) } diff --git a/pkg/logic/group__in.go b/pkg/logic/group__in.go index 0c45f9ec..2c3fb028 100644 --- a/pkg/logic/group__in.go +++ b/pkg/logic/group__in.go @@ -165,6 +165,12 @@ func (group *Group) addIn() { // delIn 有pub或pull的输入型session离开时,需要调用该函数 // func (group *Group) delIn() { + // 注意,remuxer放前面,使得有机会将内部缓存的数据吐出来 + if group.rtmp2MpegtsRemuxer != nil { + group.rtmp2MpegtsRemuxer.Dispose() + group.rtmp2MpegtsRemuxer = nil + } + group.stopPushIfNeeded() group.stopHlsIfNeeded() group.stopRecordFlvIfNeeded() @@ -174,10 +180,6 @@ func (group *Group) delIn() { group.rtspPubSession = nil group.rtsp2RtmpRemuxer = nil group.rtmp2RtspRemuxer = nil - if group.rtmp2MpegtsRemuxer != nil { - group.rtmp2MpegtsRemuxer.FlushAudio() // TODO(chef): [refactor] - group.rtmp2MpegtsRemuxer = nil - } group.dummyAudioFilter = nil group.rtmpGopCache.Clear() group.httpflvGopCache.Clear() diff --git a/pkg/mpegts/pack.go b/pkg/mpegts/pack.go index e777a0f7..f7a99fc2 100644 --- a/pkg/mpegts/pack.go +++ b/pkg/mpegts/pack.go @@ -8,7 +8,7 @@ package mpegts -// Frame 帧数据 +// Frame 帧数据,用于打包成mpegts格式的数据 // type Frame struct { Pts uint64 // =(毫秒 * 90) @@ -34,33 +34,38 @@ type Frame struct { Raw []byte } -// OnTsPacket @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存 +// Pack annexb格式的流转换为mpegts流 // -type OnTsPacket func(packet []byte) - -func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) { - packet := make([]byte, 188) - PackToTsPacket(frame, packet, onTsPacket) -} - -// PackToTsPacket Annexb格式的流转换为mpegts packet -// -// @param frame: 各字段含义见mpegts.Frame结构体定义 -// frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用 -// frame.Raw 函数调用结束后,内部不会持有该内存块 +// 注意,内部会增加 Frame.Cc 的值. // -// @param packet: 打包ts packet时的写入内存 +// @return: 内存块为独立申请,调度结束后,内部不再持有 // -// @param onTsPacket: 注意,一次函数调用,可能对应多次回调 -// -func PackToTsPacket(frame *Frame, packet []byte, onTsPacket OnTsPacket) { - wpos := 0 // 当前packet的写入位置 - lpos := 0 // 当前帧的处理位置 - rpos := len(frame.Raw) // 当前帧大小 +func (frame *Frame) Pack() []byte { + bufLen := len(frame.Raw) * 2 // 预分配一块足够大的内存 + if bufLen < 188 { + bufLen = 188 + } + buf := make([]byte, bufLen) + + lpos := 0 // 当前输入帧的处理位置 + rpos := len(frame.Raw) // 当前输入帧大小 first := true // 是否为帧的首个packet的标准 + packetPosAtBuf := 0 // 当前输出packet相对于整个输出内存块的位置 for lpos != rpos { - wpos = 0 + + // TODO(chef): CHEFNOTICEME 正常来说,预分配的内存应该是足够用了,我们加个扩容逻辑保证绝对正确性,并且加个日志观察一段时间 + if packetPosAtBuf+188 > len(buf) { + Log.Warnf("buffer too short. frame size=%d, buf=%d, packetPosAtBuf=%d", len(frame.Raw), len(buf), packetPosAtBuf) + newBuf := make([]byte, packetPosAtBuf+188) + copy(newBuf, buf) + buf = newBuf + } + + packet := buf[packetPosAtBuf : packetPosAtBuf+188] // 当前输出packet + wpos := 0 // 当前输出packet的写入位置 + packetPosAtBuf += 188 + frame.Cc++ // 每个packet都需要添加TS Header @@ -228,9 +233,9 @@ func PackToTsPacket(frame *Frame, packet []byte, onTsPacket OnTsPacket) { copy(packet[wpos:], frame.Raw[lpos:lpos+inSize]) lpos = rpos } - - onTsPacket(packet) } + + return buf[:packetPosAtBuf] } // ----- private ------------------------------------------------------------------------------------------------------- diff --git a/pkg/remux/rtmp2mpegts.go b/pkg/remux/rtmp2mpegts.go index cf626249..e43b9238 100644 --- a/pkg/remux/rtmp2mpegts.go +++ b/pkg/remux/rtmp2mpegts.go @@ -10,6 +10,7 @@ package remux import ( "encoding/hex" + "github.com/q191201771/lal/pkg/aac" "github.com/q191201771/lal/pkg/avc" "github.com/q191201771/lal/pkg/base" @@ -19,20 +20,25 @@ import ( "github.com/q191201771/naza/pkg/nazabytes" ) -var calcFragmentHeaderQueueSize = 16 -var maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90) -var maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90) +const ( + initialVideoOutBufferSize = 1024 * 1024 + calcFragmentHeaderQueueSize = 16 + maxAudioCacheDelayByAudio uint64 = 150 * 90 // 单位(毫秒*90) + maxAudioCacheDelayByVideo uint64 = 300 * 90 // 单位(毫秒*90) +) type Rtmp2MpegtsRemuxerObserver interface { // OnPatPmt // - // @param b const只读内存块,上层可以持有,但是不允许修改 + // @param b: const只读内存块,上层可以持有,但是不允许修改 // OnPatPmt(b []byte) // OnTsPackets // - // @param tsPackets: mpegts数据,有一个或多个188字节的ts数据组成 + // @param tsPackets: + // - mpegts数据,有一个或多个188字节的ts数据组成. + // - 回调结束后,remux.Rtmp2MpegtsRemuxer 不再使用这块内存块. // // @param frame: 各字段含义见 mpegts.Frame 结构体定义 // @@ -46,10 +52,10 @@ type Rtmp2MpegtsRemuxer struct { observer Rtmp2MpegtsRemuxerObserver filter *rtmp2MpegtsFilter - videoOut []byte // Annexb TODO chef: 优化这块buff + videoOut []byte // Annexb spspps []byte // Annexb 也可能是vps+sps+pps ascCtx *aac.AscContext - audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 TODO chef: 优化这块buff + audioCacheFrames []byte // 缓存音频帧数据,注意,可能包含多个音频帧 audioCacheFirstFramePts uint64 // audioCacheFrames中第一个音频帧的时间戳 TODO chef: rename to DTS audioCc uint8 videoCc uint8 @@ -59,13 +65,13 @@ type Rtmp2MpegtsRemuxer struct { func NewRtmp2MpegtsRemuxer(observer Rtmp2MpegtsRemuxerObserver) *Rtmp2MpegtsRemuxer { uk := base.GenUkRtmp2MpegtsRemuxer() - videoOut := make([]byte, 1024*1024) - videoOut = videoOut[0:0] r := &Rtmp2MpegtsRemuxer{ UniqueKey: uk, observer: observer, - videoOut: videoOut, } + r.audioCacheFrames = nil + r.videoOut = make([]byte, initialVideoOutBufferSize) + r.videoOut = r.videoOut[0:0] r.filter = newRtmp2MpegtsFilter(calcFragmentHeaderQueueSize, r) return r } @@ -78,6 +84,10 @@ func (s *Rtmp2MpegtsRemuxer) FeedRtmpMessage(msg base.RtmpMsg) { s.filter.Push(msg) } +func (s *Rtmp2MpegtsRemuxer) Dispose() { + s.FlushAudio() +} + // --------------------------------------------------------------------------------------------------------------------- // FlushAudio @@ -88,7 +98,7 @@ func (s *Rtmp2MpegtsRemuxer) FeedRtmpMessage(msg base.RtmpMsg) { // 3. 输入流关闭时 // func (s *Rtmp2MpegtsRemuxer) FlushAudio() { - if s.audioCacheFrames == nil { + if s.audioCacheEmpty() { return } @@ -101,26 +111,14 @@ func (s *Rtmp2MpegtsRemuxer) FlushAudio() { frame.Pid = mpegts.PidAudio frame.Sid = mpegts.StreamIdAudio - // 注意,在回调前设置为nil,因为回调中有可能再次调用FlushAudio - s.audioCacheFrames = nil + // 注意,在回调前设置为空,因为回调中有可能再次调用FlushAudio + s.resetAudioCache() s.onFrame(&frame) // 回调结束后更新cc s.audioCc = frame.Cc } -func (s *Rtmp2MpegtsRemuxer) AudioSeqHeaderCached() bool { - return s.ascCtx != nil -} - -func (s *Rtmp2MpegtsRemuxer) VideoSeqHeaderCached() bool { - return s.spspps != nil -} - -func (s *Rtmp2MpegtsRemuxer) AudioCacheEmpty() bool { - return s.audioCacheFrames == nil -} - // --------------------------------------------------------------------------------------------------------------------- // onPatPmt onPop @@ -140,7 +138,7 @@ func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) { } } -// ----- private ------------------------------------------------------------------------------------------------------- +// --------------------------------------------------------------------------------------------------------------------- func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { // 注意,有一种情况是msg.Payload为 27 02 00 00 00 @@ -176,8 +174,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { audSent := false spsppsSent := false - // 优化这块buffer - out := s.videoOut[0:0] + s.resetVideoOutBuffer() // msg中可能有多个NALU,逐个获取 nals, err := avc.SplitNaluAvcc(msg.Payload[5:]) @@ -210,9 +207,9 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { //if codecId == base.RtmpCodecIdAvc && (nalType == avc.NaluTypeSei || nalType == avc.NaluTypeIdrSlice || nalType == avc.NaluTypeSlice) { switch codecId { case base.RtmpCodecIdAvc: - out = append(out, avc.AudNalu...) + s.videoOut = append(s.videoOut, avc.AudNalu...) case base.RtmpCodecIdHevc: - out = append(out, hevc.AudNalu...) + s.videoOut = append(s.videoOut, hevc.AudNalu...) } audSent = true } @@ -223,7 +220,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { switch nalType { case avc.NaluTypeIdrSlice: if !spsppsSent { - if out, err = s.appendSpsPps(out); err != nil { + if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil { Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) return } @@ -237,7 +234,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { switch nalType { case hevc.NaluTypeSliceIdr, hevc.NaluTypeSliceIdrNlp, hevc.NaluTypeSliceCranut: if !spsppsSent { - if out, err = s.appendSpsPps(out); err != nil { + if s.videoOut, err = s.appendSpsPps(s.videoOut); err != nil { Log.Warnf("[%s] append spspps by not exist.", s.UniqueKey) return } @@ -251,18 +248,18 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { // 如果写入了aud或spspps,则用start code3,否则start code4。为什么要这样处理 // 这里不知为什么要区分写入两种类型的start code - if len(out) == 0 { - out = append(out, avc.NaluStartCode4...) + if len(s.videoOut) == 0 { + s.videoOut = append(s.videoOut, avc.NaluStartCode4...) } else { - out = append(out, avc.NaluStartCode3...) + s.videoOut = append(s.videoOut, avc.NaluStartCode3...) } - out = append(out, nal...) + s.videoOut = append(s.videoOut, nal...) } dts := uint64(msg.Header.TimestampAbs) * 90 - if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts { + if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByVideo < dts { s.FlushAudio() } @@ -271,7 +268,7 @@ func (s *Rtmp2MpegtsRemuxer) feedVideo(msg base.RtmpMsg) { frame.Dts = dts frame.Pts = frame.Dts + uint64(cts)*90 frame.Key = msg.IsVideoKeyNalu() - frame.Raw = out + frame.Raw = s.videoOut frame.Pid = mpegts.PidVideo frame.Sid = mpegts.StreamIdVideo @@ -297,18 +294,18 @@ func (s *Rtmp2MpegtsRemuxer) feedAudio(msg base.RtmpMsg) { return } - if !s.AudioSeqHeaderCached() { + if !s.audioSeqHeaderCached() { Log.Warnf("[%s] feed audio message but aac seq header not exist.", s.UniqueKey) return } pts := uint64(msg.Header.TimestampAbs) * 90 - if s.audioCacheFrames != nil && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts { + if !s.audioCacheEmpty() && s.audioCacheFirstFramePts+maxAudioCacheDelayByAudio < pts { s.FlushAudio() } - if s.audioCacheFrames == nil { + if s.audioCacheEmpty() { s.audioCacheFirstFramePts = pts } @@ -323,6 +320,10 @@ func (s *Rtmp2MpegtsRemuxer) cacheAacSeqHeader(msg base.RtmpMsg) error { return err } +func (s *Rtmp2MpegtsRemuxer) audioSeqHeaderCached() bool { + return s.ascCtx != nil +} + func (s *Rtmp2MpegtsRemuxer) appendSpsPps(out []byte) ([]byte, error) { if s.spspps == nil { return out, base.ErrHls @@ -332,12 +333,28 @@ func (s *Rtmp2MpegtsRemuxer) appendSpsPps(out []byte) ([]byte, error) { return out, nil } +func (s *Rtmp2MpegtsRemuxer) videoSeqHeaderCached() bool { + return s.spspps != nil +} + +func (s *Rtmp2MpegtsRemuxer) audioCacheEmpty() bool { + return len(s.audioCacheFrames) == 0 +} + +func (s *Rtmp2MpegtsRemuxer) resetAudioCache() { + s.audioCacheFrames = s.audioCacheFrames[0:0] +} + +func (s *Rtmp2MpegtsRemuxer) resetVideoOutBuffer() { + s.videoOut = s.videoOut[0:0] +} + func (s *Rtmp2MpegtsRemuxer) onFrame(frame *mpegts.Frame) { var boundary bool if frame.Sid == mpegts.StreamIdAudio { // 为了考虑没有视频的情况也能切片,所以这里判断spspps为空时,也建议生成fragment - boundary = !s.VideoSeqHeaderCached() + boundary = !s.videoSeqHeaderCached() } else { // 收到视频,可能触发建立fragment的条件是: // 关键帧数据 && @@ -346,17 +363,14 @@ func (s *Rtmp2MpegtsRemuxer) onFrame(frame *mpegts.Frame) { // (收到过音频seq header && fragment没有打开) || 说明 音视频都有,且都已ready // (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 说明 为什么音频缓存需不为空? // ) - boundary = frame.Key && (!s.AudioSeqHeaderCached() || !s.opened || !s.AudioCacheEmpty()) + boundary = frame.Key && (!s.audioSeqHeaderCached() || !s.opened || !s.audioCacheEmpty()) } if boundary { s.opened = true } - var packets []byte // TODO(chef): [refactor] - mpegts.PackTsPacket(frame, func(packet []byte) { - packets = append(packets, packet...) - }) + packets := frame.Pack() s.observer.OnTsPackets(packets, frame, boundary) }