Skip to content

Commit

Permalink
- [refactor] 将rtmp转ts的代码从hls重构至remux中 - [fix] mpegts: 修复单音频场景,有一帧音频重复的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Mar 9, 2022
1 parent cd31d9a commit 11c412c
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 166 deletions.
8 changes: 5 additions & 3 deletions app/demo/pullrtmp2hls/pullrtmp2hls.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"os"
"path/filepath"

"github.com/q191201771/lal/pkg/remux"

"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/rtmp"
Expand Down Expand Up @@ -46,13 +48,13 @@ func main() {
hlsMuexer := hls.NewMuxer(streamName, true, &hlsMuxerConfig, nil)
hlsMuexer.Start()

rtmp2Mpegts := remux.NewRtmp2MpegtsRemuxer(hlsMuexer)

pullSession := rtmp.NewPullSession(func(option *rtmp.PullSessionOption) {
option.PullTimeoutMs = 10000
option.ReadAvTimeoutMs = 10000
})
err = pullSession.Pull(url, func(msg base.RtmpMsg) {
hlsMuexer.FeedRtmpMessage(msg)
})
err = pullSession.Pull(url, rtmp2Mpegts.FeedRtmpMessage)

if err != nil {
nazalog.Fatalf("pull rtmp failed. err=%+v", err)
Expand Down
18 changes: 9 additions & 9 deletions pkg/base/unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ const (
UkPreTsSubSession = "TSSUB"
UkPreFlvPullSession = "FLVPULL"

UkPreGroup = "GROUP"
UkPreHlsMuxer = "HLSMUXER"
UkPreStreamer = "STREAMER"
UkPreGroup = "GROUP"
UkPreHlsMuxer = "HLSMUXER"
UkPreRtmp2MpegtsRemuxer = "RTMP2MPEGTS"
)

//func GenUk(prefix string) string {
Expand Down Expand Up @@ -84,8 +84,8 @@ func GenUkHlsMuxer() string {
return siUkHlsMuxer.GenUniqueKey()
}

func GenUkStreamer() string {
return siUkStreamer.GenUniqueKey()
func GenUkRtmp2MpegtsRemuxer() string {
return siUkRtmp2MpegtsRemuxer.GenUniqueKey()
}

var (
Expand All @@ -101,9 +101,9 @@ var (
siUkTsSubSession *unique.SingleGenerator
siUkFlvPullSession *unique.SingleGenerator

siUkGroup *unique.SingleGenerator
siUkHlsMuxer *unique.SingleGenerator
siUkStreamer *unique.SingleGenerator
siUkGroup *unique.SingleGenerator
siUkHlsMuxer *unique.SingleGenerator
siUkRtmp2MpegtsRemuxer *unique.SingleGenerator
)

func init() {
Expand All @@ -121,5 +121,5 @@ func init() {

siUkGroup = unique.NewSingleGenerator(UkPreGroup)
siUkHlsMuxer = unique.NewSingleGenerator(UkPreHlsMuxer)
siUkStreamer = unique.NewSingleGenerator(UkPreStreamer)
siUkRtmp2MpegtsRemuxer = unique.NewSingleGenerator(UkPreRtmp2MpegtsRemuxer)
}
67 changes: 23 additions & 44 deletions pkg/hls/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,8 @@ import (
"github.com/q191201771/lal/pkg/base"
)

// TODO chef: 转换TS流的功能(通过回调供httpts使用)也放在了Muxer中,好处是hls和httpts可以共用一份TS流。
// 后续从架构上考虑,packet hls,mpegts,logic的分工

type MuxerObserver interface {
OnPatPmt(b []byte)

// OnTsPackets
//
// @param rawFrame: TS流,回调结束后,内部不再使用该内存块
//
// @param boundary: 新的TS流接收者,应该从该标志为true时开始发送数据
//
OnTsPackets(rawFrame []byte, boundary bool)
FlushAudio()
}

// MuxerConfig
Expand Down Expand Up @@ -86,8 +75,7 @@ type Muxer struct {
frag int // frag 写入m3u8的EXT-X-MEDIA-SEQUENCE字段
frags []fragmentInfo // frags TS文件的固定大小环形队列,记录TS的信息

streamer *Streamer
patpmt []byte
patpmt []byte
}

// 记录fragment的一些信息,注意,写m3u8文件时可能还需要用到历史fragment的信息
Expand Down Expand Up @@ -123,8 +111,6 @@ func NewMuxer(streamName string, enable bool, config *MuxerConfig, observer Muxe
observer: observer,
}
m.makeFrags()
streamer := NewStreamer(m)
m.streamer = streamer
Log.Infof("[%s] lifecycle new hls muxer. muxer=%p, streamName=%s", uk, m, streamName)
return m
}
Expand All @@ -136,30 +122,33 @@ func (m *Muxer) Start() {

func (m *Muxer) Dispose() {
Log.Infof("[%s] lifecycle dispose hls muxer.", m.UniqueKey)
m.streamer.FlushAudio()
m.observer.FlushAudio()
if err := m.closeFragment(true); err != nil {
Log.Errorf("[%s] close fragment error. err=%+v", m.UniqueKey, err)
}
}

// FeedRtmpMessage @param msg 函数调用结束后,内部不持有msg中的内存块
// ---------------------------------------------------------------------------------------------------------------------

// OnPatPmt OnTsPackets
//
// 实现 remux.Rtmp2MpegtsRemuxerObserver,方便直接挂载 remux.Rtmp2MpegtsRemuxer 接收rtmp转Mpegts的数据
//
func (m *Muxer) FeedRtmpMessage(msg base.RtmpMsg) {
m.streamer.FeedRtmpMessage(msg)
func (m *Muxer) OnPatPmt(b []byte) {
m.FeedPatPmt(b)
}

func (m *Muxer) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
m.FeedMpegts(tsPackets, frame, boundary)
}

// ----- implement StreamerObserver of Streamer ------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------

func (m *Muxer) OnPatPmt(b []byte) {
func (m *Muxer) FeedPatPmt(b []byte) {
m.patpmt = b
if m.observer != nil {
m.observer.OnPatPmt(b)
}
}

func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool) {
var packets []byte

func (m *Muxer) FeedMpegts(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
if frame.Sid == mpegts.StreamIdAudio {
// TODO(chef): 为什么音频用pts,视频用dts
if err := m.updateFragment(frame.Pts, boundary); err != nil {
Expand All @@ -168,7 +157,7 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool)
}
// TODO(chef): 有updateFragment的返回值判断,这里的判断可以考虑删除
if !m.opened {
Log.Warnf("[%s] OnFrame A not opened. boundary=%t", m.UniqueKey, boundary)
Log.Warnf("[%s] FeedMpegts A not opened. boundary=%t", m.UniqueKey, boundary)
return
}
//Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
Expand All @@ -179,25 +168,15 @@ func (m *Muxer) OnFrame(streamer *Streamer, frame *mpegts.Frame, boundary bool)
}
// TODO(chef): 有updateFragment的返回值判断,这里的判断可以考虑删除
if !m.opened {
Log.Warnf("[%s] OnFrame V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)
Log.Warnf("[%s] FeedMpegts V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)
return
}
//Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))
}

mpegts.PackTsPacket(frame, func(packet []byte) {
if m.enable {
if err := m.fragment.WriteFile(packet); err != nil {
Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return
}
}
if m.observer != nil {
packets = append(packets, packet...)
}
})
if m.observer != nil {
m.observer.OnTsPackets(packets, boundary)
if err := m.fragment.WriteFile(tsPackets); err != nil {
Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return
}
}

Expand Down Expand Up @@ -314,7 +293,7 @@ func (m *Muxer) openFragment(ts uint64, discont bool) error {
m.fragTs = ts

// nrm said: start fragment with audio to make iPhone happy
m.streamer.FlushAudio()
m.observer.FlushAudio()

return nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/innertest/iface_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ var _ rtsp.PubSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ hls.MuxerObserver = &logic.Group{}
var _ rtsp.BaseInSessionObserver = &logic.Group{} //
var _ rtsp.BaseInSessionObserver = &remux.AvPacket2RtmpRemuxer{}
var _ remux.Rtmp2MpegtsRemuxerObserver = &hls.Muxer{}

var _ rtmp.ServerSessionObserver = &rtmp.Server{}
var _ rtmp.IHandshakeClient = &rtmp.HandshakeClientSimple{}
Expand All @@ -199,5 +200,3 @@ var _ rtsp.IInterleavedPacketWriter = &rtsp.PubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.SubSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ClientCommandSession{}
var _ rtsp.IInterleavedPacketWriter = &rtsp.ServerCommandSession{}

var _ hls.StreamerObserver = &hls.Muxer{}
16 changes: 9 additions & 7 deletions pkg/innertest/innertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ func entry() {
_ = ioutil.WriteFile(wTsPullFileName, b, 0666)
assert.Equal(t, goldenHttptsLenList[mode], len(b))
assert.Equal(t, goldenHttptsMd5List[mode], nazamd5.Md5(b))
Log.Debugf("CHEFGREPME < get. %d %d", len(b), httptsSize.Load())
}()
time.Sleep(100 * time.Millisecond)

Expand Down Expand Up @@ -278,12 +277,12 @@ func entry() {

for {
if httpflvPullTagCount.Load() == uint32(fileTagCount) &&
rtmpPullTagCount.Load() == uint32(fileTagCount) &&
rtmpPullTagCount.Load() == uint32(fileTagCount) &&
httptsSize.Load() == uint32(goldenHttptsLenList[mode]) {
break
}
nazalog.Debugf("%d %d %d", httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), httptsSize.Load())
nazalog.Debugf("%d(%d, %d) %d(%d)",
fileTagCount, httpflvPullTagCount.Load(), rtmpPullTagCount.Load(), goldenHttptsLenList[mode], httptsSize.Load())
time.Sleep(100 * time.Millisecond)
}

Expand Down Expand Up @@ -399,6 +398,9 @@ func getHttpts() ([]byte, error) {
if err != nil {
return buf.Bytes(), err
}
if buf.Len() == goldenHttptsLenList[mode] {
return buf.Bytes(), nil
}
}
}

Expand Down Expand Up @@ -442,17 +444,17 @@ var (
}

goldenHlsTsNumList = []int{8, 10, 8}
goldenHlsTsLenList = []int{2219152, 553848, 1696512}
goldenHlsTsLenList = []int{2219152, 525648, 1696512}
goldenHlsTsMd5List = []string{
"48db6251d40c271fd11b05650f074e0f",
"4914801b64545742aef8d18c9470b01a",
"2eb19ad498688dadf950b3e749985922",
"2d1e5c1a3ca385e0b55b2cff2f52b710",
}

goldenHttptsLenList = []int{2216332, 550464, 1693880}
goldenHttptsLenList = []int{2216332, 522264, 1693880}
goldenHttptsMd5List = []string{
"03f8eac7d2c3d5d85056c410f5fcc756",
"46008af23409e41ac1f26aa11218ad0a",
"0d102b6fb7fc3134e56a07f00292e888",
"651a2b5c93370738d81995f8fd49af4d",
}
)
Expand Down
9 changes: 5 additions & 4 deletions pkg/logic/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type Group struct {

mutex sync.Mutex
// pub
rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession
rtsp2RtmpRemuxer *remux.AvPacket2RtmpRemuxer
rtmp2RtspRemuxer *remux.Rtmp2RtspRemuxer
rtmpPubSession *rtmp.ServerSession
rtspPubSession *rtsp.PubSession
rtsp2RtmpRemuxer *remux.AvPacket2RtmpRemuxer
rtmp2RtspRemuxer *remux.Rtmp2RtspRemuxer
rtmp2MpegtsRemuxer *remux.Rtmp2MpegtsRemuxer
// pull
pullEnable bool
pullUrl string
Expand Down
62 changes: 41 additions & 21 deletions pkg/logic/group__core_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package logic
import (
"net"

"github.com/q191201771/lal/pkg/mpegts"

"github.com/q191201771/lal/pkg/avc"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hevc"
Expand Down Expand Up @@ -87,14 +89,16 @@ func (group *Group) OnAvPacket(pkt base.AvPacket) {
// ---------------------------------------------------------------------------------------------------------------------
// mpegts类型的数据
//
// OnPatPmt, OnTsPackets 来自 hls.Muxer 的回调
// OnPatPmt, OnTsPackets 来自 Rtmp2MpegtsRemuxerObserver 的回调
//
// ---------------------------------------------------------------------------------------------------------------------

// OnPatPmt ...
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b

group.hlsMuxer.FeedPatPmt(b)

if group.recordMpegts != nil {
if err := group.recordMpegts.Write(b); err != nil {
Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
Expand All @@ -103,24 +107,13 @@ func (group *Group) OnPatPmt(b []byte) {
}

// OnTsPackets ...
func (group *Group) OnTsPackets(rawFrame []byte, boundary bool) {
for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(rawFrame)
session.IsFresh = false
}
} else {
session.Write(rawFrame)
}
}
func (group *Group) OnTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
group.feedTsPackets(tsPackets, frame, boundary)
}

if group.recordMpegts != nil {
if err := group.recordMpegts.Write(rawFrame); err != nil {
Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
func (group *Group) FlushAudio() {
//to be continued
group.rtmp2MpegtsRemuxer.FlushAudio()
}

// ---------------------------------------------------------------------------------------------------------------------
Expand All @@ -142,9 +135,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
return
}

// # hls
if group.config.HlsConfig.Enable && group.hlsMuxer != nil {
group.hlsMuxer.FeedRtmpMessage(msg)
// # mpegts remuxer
if group.config.HlsConfig.Enable || group.config.HttptsConfig.Enable {
group.rtmp2MpegtsRemuxer.FeedRtmpMessage(msg)
}

// # rtsp
Expand Down Expand Up @@ -388,6 +381,33 @@ func (group *Group) feedRtpPacket(pkt rtprtcp.RtpPacket) {

// ---------------------------------------------------------------------------------------------------------------------

func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
// TODO(chef): [opt] 重构remux 2 ts后,hls的输入必须放在http ts的输入之前,保证hls重新切片时可以先flush audio
if group.hlsMuxer != nil {
group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary)
}

for session := range group.httptsSubSessionSet {
if session.IsFresh {
if boundary {
session.Write(group.patpmt)
session.Write(tsPackets)
session.IsFresh = false
}
} else {
session.Write(tsPackets)
}
}

if group.recordMpegts != nil {
if err := group.recordMpegts.Write(tsPackets); err != nil {
Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}
}

// ---------------------------------------------------------------------------------------------------------------------

func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
Expand Down
Loading

0 comments on commit 11c412c

Please sign in to comment.