Skip to content

Commit

Permalink
[fix] 修复logic.Option.NotifyHandler首字母小写外部无法设置的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Oct 31, 2021
1 parent a3ae155 commit dacfa3d
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 28 deletions.
3 changes: 2 additions & 1 deletion app/lalserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func main() {
defer nazalog.Sync()

confFile := parseFlag()
sm := logic.NewLalServer(confFile)
sm := logic.NewLalServer(confFile, func(option *logic.Option) {
})
err := sm.RunLoop()
nazalog.Infof("server manager done. err=%+v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/logic/group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

package logic

// 注意,这个模块的功能不完全,目前只使用SimpleGroupManager

// ---------------------------------------------------------------------------------------------------------------------

type IGroupCreator interface {
Expand Down Expand Up @@ -94,6 +92,10 @@ func (s *SimpleGroupManager) Len() int {

// ---------------------------------------------------------------------------------------------------------------------

// ComplexGroupManager
//
// 注意,这个模块的功能不完全,目前只使用SimpleGroupManager
//
// TODO(chef):
//
// - 现有逻辑重构至当前模块中【DONE】
Expand All @@ -106,10 +108,8 @@ func (s *SimpleGroupManager) Len() int {
// - 更新相应的文档:本文件注释,server_manager等中原有关于appName的注释,配置文件文档,流地址列表文档
// - 创建group时没有appname,后面又有了,可以考虑更新一下
// - ComplexGroupManager使用IGroupCreator

//
// ---------------------------------------------------------------------------------------------------------------------

// ComplexGroupManager
//
// 背景:
// 有的协议需要结合appName和streamName作为流唯一标识(比如rtmp,httpflv,httpts)
Expand Down
8 changes: 4 additions & 4 deletions pkg/logic/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type ILalServer interface {
// Option struct中可修改的参数说明:
// - notifyHandler 事件监听
// 业务方可实现 INotifyHandler 接口并传入从而获取到对应的事件通知
// 注意,如果业务方实现了自己的事件监听,则lal server内部不再走http notify的逻辑
// 如果不填写,内部默认走http notify的逻辑(当然,还需要在配置文件中开启http notify功能
// 如果不填写保持默认值nil,内部默认走http notify的逻辑(当然,还需要在配置文件中开启http notify功能)
// 注意,如果业务方实现了自己的事件监听,则lal server内部不再走http notify的逻辑(也即二选一
//
func NewLalServer(confFile string, modOption ...ModOption) ILalServer {
return NewServerManager(confFile, modOption...)
Expand All @@ -57,11 +57,11 @@ type INotifyHandler interface {
}

type Option struct {
notifyHandler INotifyHandler
NotifyHandler INotifyHandler
}

var defaultOption = Option{
notifyHandler: nil, // 注意,为nil时,内部会赋值为 HttpNotify
NotifyHandler: nil, // 注意,为nil时,内部会赋值为 HttpNotify
}

type ModOption func(option *Option)
Expand Down
46 changes: 28 additions & 18 deletions pkg/logic/server_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
for _, fn := range modOption {
fn(&sm.option)
}
if sm.option.notifyHandler == nil {
sm.option.notifyHandler = NewHttpNotify(sm.config.HttpNotifyConfig)
if sm.option.NotifyHandler == nil {
sm.option.NotifyHandler = NewHttpNotify(sm.config.HttpNotifyConfig)
}

if sm.config.HttpflvConfig.Enable || sm.config.HttpflvConfig.EnableHttps ||
Expand Down Expand Up @@ -117,7 +117,7 @@ func NewServerManager(confFile string, modOption ...ModOption) *ServerManager {
// ----- implement ILalServer interface --------------------------------------------------------------------------------

func (sm *ServerManager) RunLoop() error {
sm.option.notifyHandler.OnServerStart(sm.StatLalInfo())
sm.option.NotifyHandler.OnServerStart(sm.StatLalInfo())

if sm.config.PprofConfig.Enable {
go runWebPprof(sm.config.PprofConfig.Addr)
Expand Down Expand Up @@ -210,7 +210,7 @@ func (sm *ServerManager) RunLoop() error {
var updateInfo base.UpdateInfo
updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.StatAllGroup()
sm.option.notifyHandler.OnUpdate(updateInfo)
sm.option.NotifyHandler.OnUpdate(updateInfo)

t := time.NewTicker(1 * time.Second)
defer t.Stop()
Expand Down Expand Up @@ -254,7 +254,7 @@ func (sm *ServerManager) RunLoop() error {
if uis != 0 && (count%uis) == 0 {
updateInfo.ServerId = sm.config.ServerId
updateInfo.Groups = sm.StatAllGroup()
sm.option.notifyHandler.OnUpdate(updateInfo)
sm.option.NotifyHandler.OnUpdate(updateInfo)
}
}
}
Expand Down Expand Up @@ -374,10 +374,11 @@ func (sm *ServerManager) OnRtmpConnect(session *rtmp.ServerSession, opa rtmp.Obj
if tcUrl, err := opa.FindString("tcUrl"); err == nil {
info.TcUrl = tcUrl
}
sm.option.notifyHandler.OnRtmpConnect(info)
sm.option.NotifyHandler.OnRtmpConnect(info)
}

func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
nazalog.Debugf("CHEFERASEME [%s] OnNewRtmpPubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -396,11 +397,12 @@ func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnPubStart(info)
sm.option.NotifyHandler.OnPubStart(info)
return res
}

func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
nazalog.Debugf("CHEFERASEME [%s] OnDelRtmpPubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
Expand All @@ -421,10 +423,11 @@ func (sm *ServerManager) OnDelRtmpPubSession(session *rtmp.ServerSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnPubStop(info)
sm.option.NotifyHandler.OnPubStop(info)
}

func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
nazalog.Debugf("CHEFERASEME [%s] OnNewRtmpSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -441,12 +444,13 @@ func (sm *ServerManager) OnNewRtmpSubSession(session *rtmp.ServerSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStart(info)
sm.option.NotifyHandler.OnSubStart(info)

return true
}

func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
nazalog.Debugf("CHEFERASEME [%s] OnDelRtmpSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
Expand All @@ -466,12 +470,13 @@ func (sm *ServerManager) OnDelRtmpSubSession(session *rtmp.ServerSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStop(info)
sm.option.NotifyHandler.OnSubStop(info)
}

// ----- implement HttpServerHandlerObserver interface -----------------------------------------------------------------

func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) bool {
nazalog.Debugf("CHEFERASEME [%s] OnNewHttpflvSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -488,11 +493,12 @@ func (sm *ServerManager) OnNewHttpflvSubSession(session *httpflv.SubSession) boo
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStart(info)
sm.option.NotifyHandler.OnSubStart(info)
return true
}

func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
nazalog.Debugf("CHEFERASEME [%s] OnDelHttpflvSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
Expand All @@ -513,10 +519,11 @@ func (sm *ServerManager) OnDelHttpflvSubSession(session *httpflv.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStop(info)
sm.option.NotifyHandler.OnSubStop(info)
}

func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool {
nazalog.Debugf("CHEFERASEME [%s] OnNewHttptsSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -533,11 +540,12 @@ func (sm *ServerManager) OnNewHttptsSubSession(session *httpts.SubSession) bool
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStart(info)
sm.option.NotifyHandler.OnSubStart(info)
return true
}

func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
nazalog.Debugf("CHEFERASEME [%s] OnDelHttptsSubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getGroup(session.AppName(), session.StreamName())
Expand All @@ -558,7 +566,7 @@ func (sm *ServerManager) OnDelHttptsSubSession(session *httpts.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStop(info)
sm.option.NotifyHandler.OnSubStop(info)
}

// ----- implement rtsp.ServerObserver interface -----------------------------------------------------------------------
Expand All @@ -572,6 +580,7 @@ func (sm *ServerManager) OnDelRtspSession(session *rtsp.ServerCommandSession) {
}

func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
nazalog.Debugf("CHEFERASEME [%s] OnNewRtspPubSession. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -588,7 +597,7 @@ func (sm *ServerManager) OnNewRtspPubSession(session *rtsp.PubSession) bool {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnPubStart(info)
sm.option.NotifyHandler.OnPubStart(info)

return res
}
Expand All @@ -615,10 +624,11 @@ func (sm *ServerManager) OnDelRtspPubSession(session *rtsp.PubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnPubStop(info)
sm.option.NotifyHandler.OnPubStop(info)
}

func (sm *ServerManager) OnNewRtspSubSessionDescribe(session *rtsp.SubSession) (ok bool, sdp []byte) {
nazalog.Debugf("CHEFERASEME [%s] OnNewRtspSubSessionDescribe. %s, %s, %s, %s", session.UniqueKey(), session.Url(), session.AppName(), session.StreamName(), session.RawQuery())
sm.mutex.Lock()
defer sm.mutex.Unlock()
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
Expand All @@ -643,7 +653,7 @@ func (sm *ServerManager) OnNewRtspSubSessionPlay(session *rtsp.SubSession) bool
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStart(info)
sm.option.NotifyHandler.OnSubStart(info)

return res
}
Expand All @@ -670,7 +680,7 @@ func (sm *ServerManager) OnDelRtspSubSession(session *rtsp.SubSession) {
info.RemoteAddr = session.GetStat().RemoteAddr
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.notifyHandler.OnSubStop(info)
sm.option.NotifyHandler.OnSubStop(info)
}

// ----- implement IGroupCreator interface -----------------------------------------------------------------------------
Expand Down

0 comments on commit dacfa3d

Please sign in to comment.