Skip to content

Commit

Permalink
add Catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
duiniuluantanqin committed Dec 23, 2024
1 parent 7416134 commit d64943e
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 13 deletions.
11 changes: 9 additions & 2 deletions trunk/3rdparty/srs-bench/gb28181/gb28181.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,15 @@ func Run(ctx context.Context, r0 interface{}) (err error) {
return errors.Wrapf(err, "register %v", conf.sipConfig)
}

if err := session.Invite(ctx); err != nil {
return errors.Wrapf(err, "invite %v", conf.sipConfig)
if err := session.HandleRequests(ctx); err != nil {
return errors.Wrapf(err, "handle requests %v", conf.sipConfig)
}

for ctx.Err() == nil {
if session.out.ssrc != 0 && session.out.mediaPort != 0 {
break
}
time.Sleep(300 * time.Millisecond)
}

if conf.psConfig.video == "" || conf.psConfig.audio == "" {
Expand Down
155 changes: 150 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,22 @@ package gb28181

import (
"context"
"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
"github.com/yapingcat/gomedia/mpeg2"
"encoding/xml"
"fmt"
"io"
"os"
"path"
"strconv"
"strings"
"sync"
"time"

"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/pion/webrtc/v3/pkg/media/h264reader"
"github.com/yapingcat/gomedia/mpeg2"
"golang.org/x/net/html/charset"
)

type GBSessionConfig struct {
Expand Down Expand Up @@ -69,6 +73,9 @@ type GBSession struct {
cancel context.CancelFunc
// WaitGroup for coroutines.
wg sync.WaitGroup
// 于处理请的上下文和取消函数
requestCtx context.Context
requestCancel context.CancelFunc
}

func NewGBSession(c *GBSessionConfig, sc *SIPConfig) *GBSession {
Expand Down Expand Up @@ -249,6 +256,144 @@ func (v *GBSession) UnRegister(ctx context.Context) error {
return ctx.Err()
}

func (v *GBSession) HandleRequests(ctx context.Context) error {
v.requestCtx, v.requestCancel = context.WithCancel(ctx)
client := v.sip

v.wg.Add(1)
go func() {
defer v.wg.Done()

for v.requestCtx.Err() == nil {
req, err := client.WaitRequest(v.requestCtx)
if err != nil {
if v.requestCtx.Err() == nil {
logger.Ef(ctx, "Wait request err %+v", err)
}
return
}

// 根据请求类型处理不同消息
if r, ok := req.(sip.Request); ok {
switch r.Method() {
case sip.INVITE:
if v.onInviteRequest != nil {
if err = v.onInviteRequest(req); err != nil {
logger.Ef(ctx, "Handle invite err %+v", err)
continue
}
}

if err = client.Trying(ctx, req); err != nil {
logger.Ef(ctx, "Response trying err %+v", err)
continue
}
time.Sleep(100 * time.Millisecond)

res, err := client.InviteResponse(ctx, req)
if err != nil {
logger.Ef(ctx, "Response invite err %+v", err)
continue
}

// 解析 SSRC 和媒体端口
offer := req.Body()
if err = v.parseInviteOffer(offer); err != nil {
logger.Ef(ctx, "Parse invite offer err %+v", err)
continue
}

logger.Tf(ctx, "Handle invite id=%v, response=%v, ssrc=%v, mediaPort=%v",
req.MessageID(), res.MessageID(), v.out.ssrc, v.out.mediaPort)

if v.onInviteOkAck != nil {
if err = v.onInviteOkAck(req, res); err != nil {
logger.Ef(ctx, "Invite ok ack err %+v", err)
continue
}
}
case sip.MESSAGE:
if v.onMessageHeartbeat != nil {
// 解析 XML 消息体
type XMLMessage struct {
XMLName xml.Name `xml:"Notify"`
CmdType string
SN int
DeviceID string
}

temp := &XMLMessage{}
decoder := xml.NewDecoder(strings.NewReader(req.Body()))
decoder.CharsetReader = charset.NewReaderLabel
if err := decoder.Decode(temp); err != nil {
logger.Ef(ctx, "decode message error: %s\n message:%s", err.Error(), req.Body())
continue
}

// 处理不同类型的命令
switch temp.CmdType {
case "Catalog":
logger.Tf(ctx, "Catalog id=%v, ssrc=%v, mediaPort=%v",
req.MessageID(), v.out.ssrc, v.out.mediaPort)

// 构造响应消息体
body := strings.Join([]string{
`<?xml version="1.0" encoding="GB2312"?>`,
"<Response>",
"<CmdType>Catalog</CmdType>",
fmt.Sprintf("<SN>%d</SN>", temp.SN),
fmt.Sprintf("<DeviceID>%s</DeviceID>", temp.DeviceID),
"<SumNum>1</SumNum>",
"<DeviceList Num=\"1\">",
"<Item>",
fmt.Sprintf("<DeviceID>%s</DeviceID>", temp.DeviceID),
"<Name>Camera</Name>",
"<Manufacturer>SRS</Manufacturer>",
"<Model>GB28181</Model>",
"<Owner>SRS</Owner>",
"<CivilCode>34020000</CivilCode>",
"<Address>192.168.1.100</Address>",
"<Parental>0</Parental>",
"<SafetyWay>0</SafetyWay>",
"<RegisterWay>1</RegisterWay>",
"<Secrecy>0</Secrecy>",
"<Status>ON</Status>",
"</Item>",
"</DeviceList>",
"</Response>",
}, "\n")

// 发送200 OK响应
res := sip.NewResponseFromRequest("", r, 200, "OK", body)
if err = client.Send(res); err != nil {
logger.Ef(ctx, "Response catalog err %+v", err)
continue
}
}
}
}
}
}
}()

return nil
}

func (v *GBSession) parseInviteOffer(offer string) error {
var err error
ssrcStr := strings.Split(strings.Split(offer, "y=")[1], "\r\n")[0]
if v.out.ssrc, err = strconv.ParseInt(ssrcStr, 10, 64); err != nil {
return errors.Wrapf(err, "parse ssrc=%v", ssrcStr)
}

mediaPortStr := strings.Split(strings.Split(offer, "m=video")[1], " ")[1]
if v.out.mediaPort, err = strconv.ParseInt(mediaPortStr, 10, 64); err != nil {
return errors.Wrapf(err, "parse media port=%v", mediaPortStr)
}

return nil
}

type IngesterConfig struct {
psConfig PSConfig
ssrc uint32
Expand Down
25 changes: 20 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/sip.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ package gb28181
import (
"context"
"fmt"
"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"math/rand"
"net/url"
"strings"
"sync"
"time"

"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
)

type SIPConfig struct {
Expand Down Expand Up @@ -474,6 +475,20 @@ func (v *SIPSession) Wait(ctx context.Context, method sip.RequestMethod) (sip.Me
}
}

func (v *SIPSession) WaitRequest(ctx context.Context) (sip.Message, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case req := <-v.requests:
return req, nil
}
}

// Send sends a SIP message through the client
func (v *SIPSession) Send(msg sip.Message) error {
return v.client.Send(msg)
}

type SIPClient struct {
ctx context.Context
cancel context.CancelFunc
Expand Down
3 changes: 2 additions & 1 deletion trunk/3rdparty/srs-bench/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/yapingcat/gomedia/codec v0.0.0-20220617074658-94762898dc25
github.com/yapingcat/gomedia/mpeg2 v0.0.0-20220617074658-94762898dc25
golang.org/x/net v0.21.0
)

require (
Expand Down Expand Up @@ -46,8 +47,8 @@ require (
github.com/tevino/abool v0.0.0-20170917061928-9b9efcf221b5 // indirect
github.com/x-cray/logrus-prefixed-formatter v0.5.2 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

0 comments on commit d64943e

Please sign in to comment.