Skip to content

Commit

Permalink
-) 增加 rtmp.HandshakeClientComplex 复杂握手模式 -) 整理一些 struct 的 Dispose 方法 …
Browse files Browse the repository at this point in the history
…-) 整理日志 -) 增加一些单元测试和 benchmark -) 整理 lal 项目的 roadmap -) 更新 nezha 版本至 0.1.0
  • Loading branch information
q191201771 committed Sep 12, 2019
1 parent d83255e commit 25523bc
Show file tree
Hide file tree
Showing 17 changed files with 326 additions and 157 deletions.
37 changes: 36 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,42 @@ $git clone https://github.com/q191201771/lal.git && cd lal && ./build.sh

#### roadmap

第一阶段:实现rtmp转发服务器
有建议、意见、bug、功能等等欢迎提 issue 啊,100% 会回复的。

lal 服务器目标版本roadmap如下:

**v1.0.0**

实现 rtmp 转发服务器。目前已经基本完成了。大概在十一假期发布。

- 各种 rtmp 推流、拉流客户端兼容性测试
- 和其它主流 rtmp 服务器的性能对比测试
- 整理日志
- 调整框架代码
- 稳定性测试

**v1.0.0**

- Gop 缓存功能
- http-flv 拉流
- rtmp 转推
- rtmp 回源
- http-flv 回源

**v2.0.0**

- hls

**v3.0.0**

- udp quic srt
- rtp/rtcp
- webrtc

**v4.0.0**

- 分布式。提供与外部调度系统交互的接口。应对多级分发场景,或平级源站类型场景


最终目标:

Expand Down
39 changes: 14 additions & 25 deletions app/lal/group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/q191201771/nezha/pkg/log"
"github.com/q191201771/nezha/pkg/unique"
"sync"
"time"
)

// TODO chef: 没有sub了一定时间后,停止pull
Expand All @@ -16,7 +15,7 @@ type GroupManager struct {
appName string
streamName string

exitChan chan bool
exitChan chan struct{}
rtmpGroup *rtmp.Group
httpFlvGroup *httpflv.Group
mutex sync.Mutex
Expand All @@ -26,34 +25,24 @@ type GroupManager struct {

func NewGroupManager(appName string, streamName string, config *Config) *GroupManager {
uk := unique.GenUniqueKey("GROUPMANAGER")
log.Infof("lifecycle new GroupManager. [%s] appName=%s streamName=%s", uk, appName, streamName)
log.Infof("lifecycle new lal.GroupManager. [%s] appName=%s streamName=%s", uk, appName, streamName)

return &GroupManager{
config: config,
appName: appName,
streamName: streamName,
exitChan: make(chan bool),
exitChan: make(chan struct{}),
UniqueKey: uk,
}
}

func (gm *GroupManager) RunLoop() {
t := time.NewTicker(200 * time.Millisecond)
defer t.Stop()

for {
select {
case <-gm.exitChan:
return
case <-t.C:
// noop
}
}
<- gm.exitChan
}

func (gm *GroupManager) Dispose(err error) {
log.Infof("lifecycle dispose Group. [%s] reason=%v", gm.UniqueKey, err)
gm.exitChan <- true
log.Infof("lifecycle dispose lal.GroupManager. [%s] reason=%v", gm.UniqueKey, err)
gm.exitChan <- struct{}{}
}

// 返回true则允许推流,返回false则关闭连接
Expand Down Expand Up @@ -82,6 +71,12 @@ func (gm *GroupManager) IsTotalEmpty() bool {
(gm.httpFlvGroup == nil || gm.httpFlvGroup.IsTotalEmpty())
}

// GroupObserver of rtmp.Group
func (gm *GroupManager) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {

// TODO chef: broadcast to httpflv.Group
}

// GroupObserver of httpflv.Group
func (gm *GroupManager) ReadHTTPRespHeaderCB() {
// noop
Expand All @@ -99,17 +94,11 @@ func (gm *GroupManager) ReadFlvTagCB(tag *httpflv.Tag) {
// TODO chef: broadcast to rtmp.Group
}

// GroupObserver of rtmp.Group
func (gm *GroupManager) ReadRTMPAVMsgCB(header rtmp.Header, timestampAbs uint32, message []byte) {

// TODO chef: broadcast to httpflv.Group
}

func (gm *GroupManager) attachRTMPGroup(rtmpGroup *rtmp.Group) {
gm.mutex.Lock()
defer gm.mutex.Unlock()
if gm.rtmpGroup != nil && gm.rtmpGroup != rtmpGroup {
log.Warnf("CHEFNOTICEME %+v %+v", gm.rtmpGroup, rtmpGroup)
log.Warnf("duplicate rtmp group in group manager. %+v %+v", gm.rtmpGroup, rtmpGroup)
}
gm.rtmpGroup = rtmpGroup
rtmpGroup.SetObserver(gm)
Expand All @@ -119,7 +108,7 @@ func (gm *GroupManager) attachHTTPFlvGroup(httpFlvGroup *httpflv.Group) {
gm.mutex.Lock()
defer gm.mutex.Unlock()
if gm.httpFlvGroup != nil && gm.httpFlvGroup != httpFlvGroup {
log.Warnf("CHEFNOTICEME %+v %+v", gm.httpFlvGroup, httpFlvGroup)
log.Warnf("duplicate http flv group in group manager. %+v %+v", gm.httpFlvGroup, httpFlvGroup)
}
gm.httpFlvGroup = httpFlvGroup
httpFlvGroup.SetObserver(gm)
Expand Down
8 changes: 5 additions & 3 deletions app/lal/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func parseFlag() string {
flag.Parse()
if *binInfoFlag {
_, _ = fmt.Fprint(os.Stderr, bininfo.StringifyMultiLine())
os.Exit(1)
os.Exit(0)
}
if *cf == "" {
flag.Usage()
Expand Down Expand Up @@ -62,8 +62,10 @@ func loadConf(confFile string) *Config {
}

func runWebPProf() {
log.Info("start web pprof listen. addr=:10001")
if err := http.ListenAndServe("0.0.0.0:10001", nil); err != nil {
// TODO chef: config me
addr := ":10001"
log.Infof("start web pprof listen. addr=%s", addr)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Error(err)
return
}
Expand Down
39 changes: 22 additions & 17 deletions app/lal/server_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,42 +52,40 @@ func (sm *ServerManager) RunLoop() {

t := time.NewTicker(1 * time.Second)
defer t.Stop()
// TODO chef: erase me, just for debug
tmpT := time.NewTicker(10 * time.Second)
defer tmpT.Stop()
var count uint32
for {
select {
case <-sm.exitChan:
return
case <-t.C:
sm.check()
case <-tmpT.C:
// TODO chef: lock
log.Infof("group size:%d", len(sm.groupManagerMap))
count++
if (count % 10) == 0 {
sm.mutex.Lock()
log.Infof("group size:%d", len(sm.groupManagerMap))
sm.mutex.Unlock()
}
}
}
}

func (sm *ServerManager) Dispose() {
log.Debug("Dispose manager.")
log.Debug("dispose server manager.")
if sm.httpFlvServer != nil {
sm.httpFlvServer.Dispose()
}
sm.rtmpServer.Dispose()
sm.exitChan <- true
if sm.rtmpServer != nil {
sm.rtmpServer.Dispose()
}

sm.mutex.Lock()
defer sm.mutex.Unlock()
for _, gm := range sm.groupManagerMap {
gm.Dispose(lalErr)
}
sm.groupManagerMap = nil
}
sm.mutex.Unlock()

// ServerObserver of httpflv.Server
func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) bool {
gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName)
gm.AddHTTPFlvSubSession(session, httpFlvGroup)
return true
sm.exitChan <- true
}

// ServerObserver of rtmp.Server
Expand All @@ -103,12 +101,19 @@ func (sm *ServerManager) NewRTMPSubSessionCB(session *rtmp.ServerSession, rtmpGr
return true
}

// ServerObserver of httpflv.Server
func (sm *ServerManager) NewHTTPFlvSubSessionCB(session *httpflv.SubSession, httpFlvGroup *httpflv.Group) bool {
gm := sm.getOrCreateGroupManager(session.AppName, session.StreamName)
gm.AddHTTPFlvSubSession(session, httpFlvGroup)
return true
}

func (sm *ServerManager) check() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
for k, gm := range sm.groupManagerMap {
if gm.IsTotalEmpty() {
log.Infof("erase empty group. [%s]", gm.UniqueKey)
log.Infof("erase empty group manager. [%s]", gm.UniqueKey)
gm.Dispose(lalErr)
delete(sm.groupManagerMap, k)
}
Expand Down
23 changes: 9 additions & 14 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,12 @@ LDFlags=" \
-X 'github.com/q191201771/nezha/pkg/bininfo.BuildGoVersion=${BuildGoVersion}' \
"

cd ${ROOT_DIR}/app/lal && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/lal

cd ${ROOT_DIR}/app/flvfile2rtmppush && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/flvfile2rtmppush

cd ${ROOT_DIR}/app/flvfile2es && go build -o ${ROOT_DIR}/bin/flvfile2es

cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull

cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile

cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull

${ROOT_DIR}/bin/lal -v
ls -lrt ${ROOT_DIR}/bin
cd ${ROOT_DIR}/app/lal && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/lal &&
cd ${ROOT_DIR}/app/flvfile2rtmppush && go build -ldflags "$LDFlags" -o ${ROOT_DIR}/bin/flvfile2rtmppush &&
cd ${ROOT_DIR}/app/flvfile2es && go build -o ${ROOT_DIR}/bin/flvfile2es &&
cd ${ROOT_DIR}/app/httpflvpull && go build -o ${ROOT_DIR}/bin/httpflvpull &&
cd ${ROOT_DIR}/app/modflvfile && go build -o ${ROOT_DIR}/bin/modflvfile &&
cd ${ROOT_DIR}/app/rtmppull && go build -o ${ROOT_DIR}/bin/rtmppull &&
${ROOT_DIR}/bin/lal -v &&
ls -lrt ${ROOT_DIR}/bin &&
echo 'build done.'
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ module github.com/q191201771/lal

go 1.12

require github.com/q191201771/nezha v0.0.2-0.20190909120248-23806813af03
require github.com/q191201771/nezha v0.1.0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
github.com/q191201771/nezha v0.0.2-0.20190909120248-23806813af03 h1:or7KshWRYtGDSLeFnkitJrnGqc1mt7CRHFG8uk0X4gs=
github.com/q191201771/nezha v0.0.2-0.20190909120248-23806813af03/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M=
github.com/q191201771/nezha v0.1.0 h1:ZCFC5g9Vc5jNGG/hSMMBxF2EF8BsWiBMMTMqdnM1Uew=
github.com/q191201771/nezha v0.1.0/go.mod h1:Rd4R+bJRemlSUnz7KHmSX6ZQlsHLBjT7wlzuLeOia/M=
31 changes: 15 additions & 16 deletions pkg/rtmp/amf0.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ package rtmp

import (
"bytes"
"errors"
"github.com/q191201771/nezha/pkg/bele"
"io"
)

"errors"
var (
ErrAMFInvalidType = errors.New("lal.AMF0: invalid type.")
ErrAMFTooShort = errors.New("lal.AMF0: too short.")
)

const (
Expand All @@ -21,25 +25,20 @@ const (
AMF0TypeMarkerObjectEnd = uint8(0x09)
AMF0TypeMarkerLongString = uint8(0x0c)

AMF0TypeMarkerMovieclip = uint8(0x04)
AMF0TypeMarkerUndefined = uint8(0x06)
AMF0TypeMarkerReference = uint8(0x07)
AMF0TypeMarkerEcmaArray = uint8(0x08)
AMF0TypeMarkerStrictArray = uint8(0x0a)
AMF0TypeMarkerData = uint8(0x0b)
AMF0TypeMarkerUnsupported = uint8(0x0d)
AMF0TypeMarkerRecordset = uint8(0x0e)
AMF0TypeMarkerXmlDocument = uint8(0x0f)
AMF0TypeMarkerTypedObject = uint8(0x10)
//AMF0TypeMarkerMovieclip = uint8(0x04)
//AMF0TypeMarkerUndefined = uint8(0x06)
//AMF0TypeMarkerReference = uint8(0x07)
//AMF0TypeMarkerEcmaArray = uint8(0x08)
//AMF0TypeMarkerStrictArray = uint8(0x0a)
//AMF0TypeMarkerData = uint8(0x0b)
//AMF0TypeMarkerUnsupported = uint8(0x0d)
//AMF0TypeMarkerRecordset = uint8(0x0e)
//AMF0TypeMarkerXmlDocument = uint8(0x0f)
//AMF0TypeMarkerTypedObject = uint8(0x10)
)

var AMF0TypeMarkerObjectEndBytes = []byte{0, 0, AMF0TypeMarkerObjectEnd}

var (
ErrAMFInvalidType = errors.New("lal.AMF0: invalid type.")
ErrAMFTooShort = errors.New("lal.AMF0: too short.")
)

type ObjectPair struct {
key string
value interface{}
Expand Down
25 changes: 25 additions & 0 deletions pkg/rtmp/amf0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,28 @@ func TestAMF0Corner(t *testing.T) {
}
_ = AMF0.WriteObject(mw, objs)
}

func BenchmarkAmf0_ReadObject(b *testing.B) {
out := &bytes.Buffer{}
objs := []ObjectPair{
{"air", 3},
{"ban", "cat"},
{"dog", true},
}
_ = AMF0.WriteObject(out, objs)
for i := 0; i < b.N; i++ {
_, _, _ = AMF0.ReadObject(out.Bytes())
}
}

func BenchmarkAmf0_WriteObject(b *testing.B) {
out := &bytes.Buffer{}
objs := []ObjectPair{
{"air", 3},
{"ban", "cat"},
{"dog", true},
}
for i := 0; i < b.N; i++ {
_ = AMF0.WriteObject(out, objs)
}
}
Loading

0 comments on commit 25523bc

Please sign in to comment.