Skip to content

Commit

Permalink
[perf] mpegts: 优化转换mpegts的性能
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Mar 12, 2022
1 parent 11c412c commit 92c0c72
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 121 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ image:
.PHONY: clean
clean:
rm -rf ./bin ./lal_record ./logs coverage.txt
find ./pkg -name 'lal_record' | xargs rm -rf
find ./pkg -name 'logs' | xargs rm -rf

.PHONY: all
all: build test
2 changes: 1 addition & 1 deletion pkg/aac/aac.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (ascCtx *AscContext) GetSamplingFrequency() (int, error) {
case AscSamplingFrequencyIndex44100:
return 44100, nil
}
return -1, fmt.Errorf("%w. index=%d", base.ErrSamplingFrequencyIndex, ascCtx.SamplingFrequencyIndex)
return -1, fmt.Errorf("%w. asCtx=%+v", base.ErrSamplingFrequencyIndex, ascCtx)
}

type AdtsHeaderContext struct {
Expand Down
9 changes: 4 additions & 5 deletions pkg/hls/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

type MuxerObserver interface {
FlushAudio()
OnFragmentOpen()
}

// MuxerConfig
Expand All @@ -43,7 +43,7 @@ const (

// Muxer
//
// 输入rtmp流,转出hls(m3u8+ts)至文件中,并回调给上层转出ts流
// 输入mpegts流,转出hls(m3u8+ts)至文件中
//
type Muxer struct {
UniqueKey string
Expand Down Expand Up @@ -122,7 +122,6 @@ func (m *Muxer) Start() {

func (m *Muxer) Dispose() {
Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey)
m.observer.FlushAudio()
if err := m.closeFragment(true); err != nil {
Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err)
}
Expand All @@ -132,7 +131,7 @@ func (m *Muxer) Dispose() {

// OnPatPmt OnTsPackets
//
// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接挂载 remux.Rtmp2MpegtsRemuxer 接收rtmp转Mpegts的数据
// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接将 remux.Rtmp2MpegtsRemuxer 的数据喂入 hls.Muxer
//
func (m *Muxer) OnPatPmt(b []byte) {
m.FeedPatPmt(b)
Expand Down Expand Up @@ -293,7 +292,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
m.fragTs = ts

// nrm said: start fragment with audio to make iPhone happy
m.observer.FlushAudio()
m.observer.OnFragmentOpen()

return nil
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/hls/var.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,3 @@ var (

Log = nazalog.GetGlobalLogger()
)

var (
calcFragmentHeaderQueueSize = 16
)
85 changes: 49 additions & 36 deletions pkg/logic/group__core_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,30 @@ import (
)

// group__streaming.go
//
// 包含group中音视频数据转发、转封装协议的逻辑
//

// ---------------------------------------------------------------------------------------------------------------------
// 输入rtmp类型的数据
//
// OnReadRtmpAvMsg 来自 rtmp.ServerSession(Pub), rtmp.PullSession, remux.DummyAudioFilter 的回调

// OnReadRtmpAvMsg
//
// onRtmpMsgFromRemux 来自内部协议转换
// 输入rtmp数据.
// 来自 rtmp.ServerSession(Pub), rtmp.PullSession, (remux.DummyAudioFilter) 的回调.
//
// ---------------------------------------------------------------------------------------------------------------------

// OnReadRtmpAvMsg ...
func (group *Group) OnReadRtmpAvMsg(msg base.RtmpMsg) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.broadcastByRtmpMsg(msg)
}

func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
group.broadcastByRtmpMsg(msg)
}

// ---------------------------------------------------------------------------------------------------------------------
// rtp类型以及rtp组成帧之后的数据
//
// OnSdp, OnRtpPacket, OnAvPacket 来自 rtsp.PubSession 的回调

// OnSdp OnRtpPacket OnAvPacket
//
// onSdpFromRemux, onRtpPacketFromRemux 来自内部协议转换
// 输入rtsp(rtp)和rtp合帧之后的数据.
// 来自 rtsp.PubSession 的回调.
//
// ---------------------------------------------------------------------------------------------------------------------

// OnSdp ...
func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.mutex.Lock()
defer group.mutex.Unlock()
Expand All @@ -62,23 +54,13 @@ func (group *Group) OnSdp(sdpCtx sdp.LogicContext) {
group.rtsp2RtmpRemuxer.OnSdp(sdpCtx)
}

// onSdpFromRemux ...
func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx
}

// OnRtpPacket ...
func (group *Group) OnRtpPacket(pkt rtprtcp.RtpPacket) {
group.mutex.Lock()
defer group.mutex.Unlock()
group.feedRtpPacket(pkt)
}

// onRtpPacketFromRemux ...
func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) {
group.feedRtpPacket(pkt)
}

// OnAvPacket ...
func (group *Group) OnAvPacket(pkt base.AvPacket) {
group.mutex.Lock()
Expand All @@ -87,13 +69,12 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) {
}

// ---------------------------------------------------------------------------------------------------------------------
// mpegts类型的数据

// OnPatPmt OnTsPackets
//
// OnPatPmt, OnTsPackets 来自 Rtmp2MpegtsRemuxerObserver 的回调
// 输入mpegts数据.
// 来自 remux.Rtmp2MpegtsRemuxer 的回调.
//
// ---------------------------------------------------------------------------------------------------------------------

// OnPatPmt ...
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b

Expand All @@ -111,8 +92,40 @@ func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary
group.feedTsPackets(tsPackets, frame, boundary)
}

func (group *Group) FlushAudio() {
//to be continued
// ---------------------------------------------------------------------------------------------------------------------

// onRtmpMsgFromRemux
//
// 输入rtmp数据.
// 来自 remux.AvPacket2RtmpRemuxer 的回调.
//
func (group *Group) onRtmpMsgFromRemux(msg base.RtmpMsg) {
group.broadcastByRtmpMsg(msg)
}

// ---------------------------------------------------------------------------------------------------------------------

// onSdpFromRemux onRtpPacketFromRemux
//
// 输入rtsp(rtp)数据.
// 来自 remux.Rtmp2RtspRemuxer 的回调.
//
func (group *Group) onSdpFromRemux(sdpCtx sdp.LogicContext) {
group.sdpCtx = &sdpCtx
}

// onRtpPacketFromRemux ...
func (group *Group) onRtpPacketFromRemux(pkt rtprtcp.RtpPacket) {
group.feedRtpPacket(pkt)
}

// ---------------------------------------------------------------------------------------------------------------------

// OnFragmentOpen
//
// 来自 hls.Muxer 的回调
//
func (group *Group) OnFragmentOpen() {
group.rtmp2MpegtsRemuxer.FlushAudio()
}

Expand Down Expand Up @@ -382,7 +395,7 @@ func (group *Group) feedRtpPacket(pkt rtprtcp.RtpPacket) {
// ---------------------------------------------------------------------------------------------------------------------

func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
// TODO(chef): [opt] 重构remux 2 ts后,hls的输入必须放在http ts的输入之前,保证hls重新切片时可以先flush audio
// 注意,hls的处理放在前面,让hls先判断是否打开新的fragment并flush audio
if group.hlsMuxer != nil {
group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/logic/group__in.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func (group *Group) addIn() {
// delIn 有pub或pull的输入型session离开时,需要调用该函数
//
func (group *Group) delIn() {
// 注意,remuxer放前面,使得有机会将内部缓存的数据吐出来
if group.rtmp2MpegtsRemuxer != nil {
group.rtmp2MpegtsRemuxer.Dispose()
group.rtmp2MpegtsRemuxer = nil
}

group.stopPushIfNeeded()
group.stopHlsIfNeeded()
group.stopRecordFlvIfNeeded()
Expand All @@ -174,10 +180,6 @@ func (group *Group) delIn() {
group.rtspPubSession = nil
group.rtsp2RtmpRemuxer = nil
group.rtmp2RtspRemuxer = nil
if group.rtmp2MpegtsRemuxer != nil {
group.rtmp2MpegtsRemuxer.FlushAudio() // TODO(chef): [refactor]
group.rtmp2MpegtsRemuxer = nil
}
group.dummyAudioFilter = nil
group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
Expand Down
53 changes: 29 additions & 24 deletions pkg/mpegts/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

package mpegts

// Frame 帧数据
// Frame 帧数据,用于打包成mpegts格式的数据
//
type Frame struct {
Pts uint64 // =(毫秒 * 90)
Expand All @@ -34,33 +34,38 @@ type Frame struct {
Raw []byte
}

// OnTsPacket @param packet: 188字节大小的TS包,注意,一次Pack对应的多个TSPacket,复用的是一块内存
// Pack annexb格式的流转换为mpegts流
//
type OnTsPacket func(packet []byte)

func PackTsPacket(frame *Frame, onTsPacket OnTsPacket) {
packet := make([]byte, 188)
PackToTsPacket(frame, packet, onTsPacket)
}

// PackToTsPacket Annexb格式的流转换为mpegts packet
//
// @param frame: 各字段含义见mpegts.Frame结构体定义
// frame.CC 注意,内部会修改frame.CC的值,外部在调用结束后,可保存CC的值,供下次调用时使用
// frame.Raw 函数调用结束后,内部不会持有该内存块
// 注意,内部会增加 Frame.Cc 的值.
//
// @param packet: 打包ts packet时的写入内存
// @return: 内存块为独立申请,调度结束后,内部不再持有
//
// @param onTsPacket: 注意,一次函数调用,可能对应多次回调
//
func PackToTsPacket(frame *Frame, packet []byte, onTsPacket OnTsPacket) {
wpos := 0 // 当前packet的写入位置
lpos := 0 // 当前帧的处理位置
rpos := len(frame.Raw) // 当前帧大小
func (frame *Frame) Pack() []byte {
bufLen := len(frame.Raw) * 2 // 预分配一块足够大的内存
if bufLen < 188 {
bufLen = 188
}
buf := make([]byte, bufLen)

lpos := 0 // 当前输入帧的处理位置
rpos := len(frame.Raw) // 当前输入帧大小
first := true // 是否为帧的首个packet的标准
packetPosAtBuf := 0 // 当前输出packet相对于整个输出内存块的位置

for lpos != rpos {
wpos = 0

// TODO(chef): CHEFNOTICEME 正常来说,预分配的内存应该是足够用了,我们加个扩容逻辑保证绝对正确性,并且加个日志观察一段时间
if packetPosAtBuf+188 > len(buf) {
Log.Warnf("buffer too short. frame size=%d, buf=%d, packetPosAtBuf=%d", len(frame.Raw), len(buf), packetPosAtBuf)
newBuf := make([]byte, packetPosAtBuf+188)
copy(newBuf, buf)
buf = newBuf
}

packet := buf[packetPosAtBuf : packetPosAtBuf+188] // 当前输出packet
wpos := 0 // 当前输出packet的写入位置
packetPosAtBuf += 188

frame.Cc++

// 每个packet都需要添加TS Header
Expand Down Expand Up @@ -228,9 +233,9 @@ func PackToTsPacket(frame *Frame, packet []byte, onTsPacket OnTsPacket) {
copy(packet[wpos:], frame.Raw[lpos:lpos+inSize])
lpos = rpos
}

onTsPacket(packet)
}

return buf[:packetPosAtBuf]
}

// ----- private -------------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 92c0c72

Please sign in to comment.