diff --git a/trunk/3rdparty/srs-bench/gb28181/gb28181.go b/trunk/3rdparty/srs-bench/gb28181/gb28181.go index f2cf9bb555..917e1a7fac 100644 --- a/trunk/3rdparty/srs-bench/gb28181/gb28181.go +++ b/trunk/3rdparty/srs-bench/gb28181/gb28181.go @@ -116,8 +116,15 @@ func Run(ctx context.Context, r0 interface{}) (err error) { return errors.Wrapf(err, "register %v", conf.sipConfig) } - if err := session.Invite(ctx); err != nil { - return errors.Wrapf(err, "invite %v", conf.sipConfig) + if err := session.HandleRequests(ctx); err != nil { + return errors.Wrapf(err, "handle requests %v", conf.sipConfig) + } + + for ctx.Err() == nil { + if session.out.ssrc != 0 && session.out.mediaPort != 0 { + break + } + time.Sleep(300 * time.Millisecond) } if conf.psConfig.video == "" || conf.psConfig.audio == "" { diff --git a/trunk/3rdparty/srs-bench/gb28181/ingester.go b/trunk/3rdparty/srs-bench/gb28181/ingester.go index 9bcb3270aa..997f705611 100644 --- a/trunk/3rdparty/srs-bench/gb28181/ingester.go +++ b/trunk/3rdparty/srs-bench/gb28181/ingester.go @@ -22,11 +22,8 @@ package gb28181 import ( "context" - "github.com/ghettovoice/gosip/sip" - "github.com/ossrs/go-oryx-lib/errors" - "github.com/ossrs/go-oryx-lib/logger" - "github.com/pion/webrtc/v3/pkg/media/h264reader" - "github.com/yapingcat/gomedia/mpeg2" + "encoding/xml" + "fmt" "io" "os" "path" @@ -34,6 +31,13 @@ import ( "strings" "sync" "time" + + "github.com/ghettovoice/gosip/sip" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" + "github.com/pion/webrtc/v3/pkg/media/h264reader" + "github.com/yapingcat/gomedia/mpeg2" + "golang.org/x/net/html/charset" ) type GBSessionConfig struct { @@ -69,6 +73,9 @@ type GBSession struct { cancel context.CancelFunc // WaitGroup for coroutines. wg sync.WaitGroup + // 于处理请的上下文和取消函数 + requestCtx context.Context + requestCancel context.CancelFunc } func NewGBSession(c *GBSessionConfig, sc *SIPConfig) *GBSession { @@ -249,6 +256,144 @@ func (v *GBSession) UnRegister(ctx context.Context) error { return ctx.Err() } +func (v *GBSession) HandleRequests(ctx context.Context) error { + v.requestCtx, v.requestCancel = context.WithCancel(ctx) + client := v.sip + + v.wg.Add(1) + go func() { + defer v.wg.Done() + + for v.requestCtx.Err() == nil { + req, err := client.WaitRequest(v.requestCtx) + if err != nil { + if v.requestCtx.Err() == nil { + logger.Ef(ctx, "Wait request err %+v", err) + } + return + } + + // 根据请求类型处理不同消息 + if r, ok := req.(sip.Request); ok { + switch r.Method() { + case sip.INVITE: + if v.onInviteRequest != nil { + if err = v.onInviteRequest(req); err != nil { + logger.Ef(ctx, "Handle invite err %+v", err) + continue + } + } + + if err = client.Trying(ctx, req); err != nil { + logger.Ef(ctx, "Response trying err %+v", err) + continue + } + time.Sleep(100 * time.Millisecond) + + res, err := client.InviteResponse(ctx, req) + if err != nil { + logger.Ef(ctx, "Response invite err %+v", err) + continue + } + + // 解析 SSRC 和媒体端口 + offer := req.Body() + if err = v.parseInviteOffer(offer); err != nil { + logger.Ef(ctx, "Parse invite offer err %+v", err) + continue + } + + logger.Tf(ctx, "Handle invite id=%v, response=%v, ssrc=%v, mediaPort=%v", + req.MessageID(), res.MessageID(), v.out.ssrc, v.out.mediaPort) + + if v.onInviteOkAck != nil { + if err = v.onInviteOkAck(req, res); err != nil { + logger.Ef(ctx, "Invite ok ack err %+v", err) + continue + } + } + case sip.MESSAGE: + if v.onMessageHeartbeat != nil { + // 解析 XML 消息体 + type XMLMessage struct { + XMLName xml.Name `xml:"Notify"` + CmdType string + SN int + DeviceID string + } + + temp := &XMLMessage{} + decoder := xml.NewDecoder(strings.NewReader(req.Body())) + decoder.CharsetReader = charset.NewReaderLabel + if err := decoder.Decode(temp); err != nil { + logger.Ef(ctx, "decode message error: %s\n message:%s", err.Error(), req.Body()) + continue + } + + // 处理不同类型的命令 + switch temp.CmdType { + case "Catalog": + logger.Tf(ctx, "Catalog id=%v, ssrc=%v, mediaPort=%v", + req.MessageID(), v.out.ssrc, v.out.mediaPort) + + // 构造响应消息体 + body := strings.Join([]string{ + ``, + "", + "Catalog", + fmt.Sprintf("%d", temp.SN), + fmt.Sprintf("%s", temp.DeviceID), + "1", + "", + "", + fmt.Sprintf("%s", temp.DeviceID), + "Camera", + "SRS", + "GB28181", + "SRS", + "34020000", + "
192.168.1.100
", + "0", + "0", + "1", + "0", + "ON", + "
", + "
", + "
", + }, "\n") + + // 发送200 OK响应 + res := sip.NewResponseFromRequest("", r, 200, "OK", body) + if err = client.Send(res); err != nil { + logger.Ef(ctx, "Response catalog err %+v", err) + continue + } + } + } + } + } + } + }() + + return nil +} + +func (v *GBSession) parseInviteOffer(offer string) error { + var err error + ssrcStr := strings.Split(strings.Split(offer, "y=")[1], "\r\n")[0] + if v.out.ssrc, err = strconv.ParseInt(ssrcStr, 10, 64); err != nil { + return errors.Wrapf(err, "parse ssrc=%v", ssrcStr) + } + + mediaPortStr := strings.Split(strings.Split(offer, "m=video")[1], " ")[1] + if v.out.mediaPort, err = strconv.ParseInt(mediaPortStr, 10, 64); err != nil { + return errors.Wrapf(err, "parse media port=%v", mediaPortStr) + } + + return nil +} + type IngesterConfig struct { psConfig PSConfig ssrc uint32 diff --git a/trunk/3rdparty/srs-bench/gb28181/sip.go b/trunk/3rdparty/srs-bench/gb28181/sip.go index 1bc8cd0d30..3a88eded95 100644 --- a/trunk/3rdparty/srs-bench/gb28181/sip.go +++ b/trunk/3rdparty/srs-bench/gb28181/sip.go @@ -23,16 +23,17 @@ package gb28181 import ( "context" "fmt" - "github.com/ghettovoice/gosip/log" - "github.com/ghettovoice/gosip/sip" - "github.com/ghettovoice/gosip/transport" - "github.com/ossrs/go-oryx-lib/errors" - "github.com/ossrs/go-oryx-lib/logger" "math/rand" "net/url" "strings" "sync" "time" + + "github.com/ghettovoice/gosip/log" + "github.com/ghettovoice/gosip/sip" + "github.com/ghettovoice/gosip/transport" + "github.com/ossrs/go-oryx-lib/errors" + "github.com/ossrs/go-oryx-lib/logger" ) type SIPConfig struct { @@ -474,6 +475,20 @@ func (v *SIPSession) Wait(ctx context.Context, method sip.RequestMethod) (sip.Me } } +func (v *SIPSession) WaitRequest(ctx context.Context) (sip.Message, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case req := <-v.requests: + return req, nil + } +} + +// Send sends a SIP message through the client +func (v *SIPSession) Send(msg sip.Message) error { + return v.client.Send(msg) +} + type SIPClient struct { ctx context.Context cancel context.CancelFunc diff --git a/trunk/3rdparty/srs-bench/go.mod b/trunk/3rdparty/srs-bench/go.mod index d4667a1a5a..e70b77a1a0 100644 --- a/trunk/3rdparty/srs-bench/go.mod +++ b/trunk/3rdparty/srs-bench/go.mod @@ -18,6 +18,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/yapingcat/gomedia/codec v0.0.0-20220617074658-94762898dc25 github.com/yapingcat/gomedia/mpeg2 v0.0.0-20220617074658-94762898dc25 + golang.org/x/net v0.21.0 ) require ( @@ -46,8 +47,8 @@ require ( github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 // indirect github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect golang.org/x/crypto v0.21.0 // indirect - golang.org/x/net v0.21.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect + golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect )