diff --git a/.dockerignore b/.dockerignore index 9316bdd3b..7dda417f0 100644 --- a/.dockerignore +++ b/.dockerignore @@ -7,7 +7,6 @@ islb.Dockerfile web.Dockerfile docker-compose.yml docker -configs docs screenshots */*/node_modules diff --git a/cmd/avp/main.go b/cmd/avp/main.go index bcc64196c..d07985cb0 100644 --- a/cmd/avp/main.go +++ b/cmd/avp/main.go @@ -2,32 +2,57 @@ package main import ( "errors" + "fmt" "net/http" _ "net/http/pprof" + "path" + elements "github.com/pion/ion-elements" conf "github.com/pion/ion/pkg/conf/avp" "github.com/pion/ion/pkg/discovery" "github.com/pion/ion/pkg/log" "github.com/pion/ion/pkg/node/avp" "github.com/pion/ion/pkg/process" - "github.com/pion/ion/pkg/process/elements" "github.com/pion/ion/pkg/process/samples" "github.com/pion/ion/pkg/proto" ) -func getDefaultElements(id string) map[string]elements.Element { - de := make(map[string]elements.Element) +func getDefaultElements(id string) map[string]process.Element { + de := make(map[string]process.Element) if conf.Pipeline.WebmSaver.Enabled && conf.Pipeline.WebmSaver.DefaultOn { - webm := elements.NewWebmSaver(id) - de[elements.TypeWebmSaver] = webm + filewriter := elements.NewFileWriter(elements.FileWriterConfig{ + ID: id, + Path: path.Join(conf.Pipeline.WebmSaver.Path, fmt.Sprintf("%s.webm", id)), + }) + webm := elements.NewWebmSaver(elements.WebmSaverConfig{ + ID: id, + }) + err := webm.Attach(filewriter) + if err != nil { + log.Errorf("error attaching filewriter to webm %s", err) + } else { + de[elements.TypeWebmSaver] = webm + } } return de } -func getTogglableElement(msg proto.ElementInfo) (elements.Element, error) { +func getTogglableElement(msg proto.ElementInfo) (process.Element, error) { switch msg.Type { case elements.TypeWebmSaver: - return elements.NewWebmSaver(msg.MID), nil + filewriter := elements.NewFileWriter(elements.FileWriterConfig{ + ID: msg.MID, + Path: path.Join(conf.Pipeline.WebmSaver.Path, fmt.Sprintf("%s.webm", msg.MID)), + }) + webm := elements.NewWebmSaver(elements.WebmSaverConfig{ + ID: msg.MID, + }) + err := webm.Attach(filewriter) + if err != nil { + log.Errorf("error attaching filewriter to webm %s", err) + return nil, err + } + return webm, nil } return nil, errors.New("element not found") diff --git a/codecov.yml b/codecov.yml index da6635317..44be0a36d 100644 --- a/codecov.yml +++ b/codecov.yml @@ -10,10 +10,7 @@ coverage: default: # Allow decreasing 2% of total coverage to avoid noise. threshold: 2% - # patch: - # default: - # target: 30% - # only_pulls: true + patch: off ignore: - "cmd/*" diff --git a/docker-compose.stable.yml b/docker-compose.stable.yml index 1811e7ea5..f79f86fad 100644 --- a/docker-compose.stable.yml +++ b/docker-compose.stable.yml @@ -2,10 +2,7 @@ version: "3.7" services: sfu: - image: pionwebrtc/ion-sfu:v0.4.1 - command: "-c /configs/sfu.toml" - volumes: - - "./configs/docker/sfu.toml:/configs/sfu.toml" + image: pionwebrtc/ion-sfu:v0.4.4 ports: - "5000-5200:5000-5200/udp" depends_on: @@ -15,10 +12,7 @@ services: - ionnet biz: - image: pionwebrtc/ion-biz:v0.4.1 - command: "-c /configs/biz.toml" - volumes: - - "./configs/docker/biz.toml:/configs/biz.toml" + image: pionwebrtc/ion-biz:v0.4.4 ports: - 8443:8443 networks: @@ -28,10 +22,7 @@ services: - etcd islb: - image: pionwebrtc/ion-islb:v0.4.1 - command: "-c /configs/islb.toml" - volumes: - - "./configs/docker/islb.toml:/configs/islb.toml" + image: pionwebrtc/ion-islb:v0.4.4 depends_on: - nats - etcd diff --git a/docker/avp.Dockerfile b/docker/avp.Dockerfile index e0d540b3c..a5e32877d 100644 --- a/docker/avp.Dockerfile +++ b/docker/avp.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.14.2-stretch +FROM golang:1.14.4-stretch ENV GO111MODULE=on @@ -18,5 +18,7 @@ FROM alpine:3.12.0 RUN apk --no-cache add ca-certificates COPY --from=0 /avp /usr/local/bin/avp +COPY configs/docker/avp.toml /configs/avp.toml + ENTRYPOINT ["/usr/local/bin/avp"] CMD ["-c", "/configs/avp.toml"] diff --git a/docker/biz.Dockerfile b/docker/biz.Dockerfile index f6a55e953..76322c333 100644 --- a/docker/biz.Dockerfile +++ b/docker/biz.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.14.3-stretch +FROM golang:1.14.4-stretch ENV GO111MODULE=on @@ -18,5 +18,7 @@ FROM alpine:3.12.0 RUN apk --no-cache add ca-certificates COPY --from=0 /biz /usr/local/bin/biz +COPY configs/docker/biz.toml /configs/biz.toml + ENTRYPOINT ["/usr/local/bin/biz"] CMD ["-c", "/configs/biz.toml"] diff --git a/docker/islb.Dockerfile b/docker/islb.Dockerfile index 00dfce0ae..2267a1440 100644 --- a/docker/islb.Dockerfile +++ b/docker/islb.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.14.3-stretch +FROM golang:1.14.4-stretch ENV GO111MODULE=on @@ -17,5 +17,7 @@ FROM alpine:3.12.0 RUN apk --no-cache add ca-certificates COPY --from=0 /islb /usr/local/bin/islb +COPY configs/docker/islb.toml /configs/islb.toml + ENTRYPOINT ["/usr/local/bin/islb"] CMD ["-c", "/configs/islb.toml"] diff --git a/docker/sfu.Dockerfile b/docker/sfu.Dockerfile index 5b7c33113..da51ededd 100644 --- a/docker/sfu.Dockerfile +++ b/docker/sfu.Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.14.3-stretch +FROM golang:1.14.4-stretch ENV GO111MODULE=on @@ -18,5 +18,7 @@ FROM alpine:3.12.0 RUN apk --no-cache add ca-certificates COPY --from=0 /sfu /usr/local/bin/sfu +COPY configs/docker/sfu.toml /configs/sfu.toml + ENTRYPOINT ["/usr/local/bin/sfu"] CMD ["-c", "/configs/sfu.toml"] diff --git a/go.mod b/go.mod index 2096b543d..5f690a7e6 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,16 @@ go 1.13 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/at-wat/ebml-go v0.11.0 - github.com/cloudwebrtc/go-protoo v0.0.0-20200510140124-cf4744e87257 - github.com/cloudwebrtc/nats-protoo v0.0.0-20200328144814-d3c1c848d442 - github.com/coreos/etcd v3.3.22+incompatible // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/cloudwebrtc/go-protoo v0.0.0-20200602160428-0a199e23f7e0 + github.com/cloudwebrtc/nats-protoo v0.0.0-20200602160410-25aa7ab703af github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-redis/redis/v7 v7.3.0 github.com/google/uuid v1.1.1 github.com/json-iterator/go v1.1.9 // indirect github.com/klauspost/cpuid v1.2.3 // indirect github.com/klauspost/reedsolomon v1.9.3 // indirect - github.com/nats-io/nats-server/v2 v2.1.4 // indirect github.com/notedit/sdp v0.0.4 + github.com/pion/ion-elements v0.1.1 // indirect github.com/pion/rtcp v1.2.3 github.com/pion/rtp v1.5.4 github.com/pion/stun v0.3.5 @@ -30,7 +28,7 @@ require ( github.com/tjfoc/gmsm v1.3.1 // indirect github.com/xtaci/kcp-go v5.4.20+incompatible github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 // indirect - go.etcd.io/etcd v3.3.22+incompatible + go.etcd.io/etcd v3.3.4+incompatible golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 - sigs.k8s.io/yaml v1.2.0 // indirect + gopkg.in/yaml.v2 v2.2.8 // indirect ) diff --git a/go.sum b/go.sum index 31ee4d710..496e8611f 100644 --- a/go.sum +++ b/go.sum @@ -39,10 +39,10 @@ github.com/cheekybits/genny v1.0.0/go.mod h1:+tQajlRqAUrPI7DOSpB0XAqZYtQakVtB7wX github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 h1:xz6Nv3zcwO2Lila35hcb0QloCQsc38Al13RNEzWRpX4= github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9/go.mod h1:2wSM9zJkl1UQEFZgSd68NfCgRz1VL1jzy/RjCg+ULrs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudwebrtc/go-protoo v0.0.0-20200510140124-cf4744e87257 h1:wFUOcfpZkIkTj3VLzzDGFkpXIkP7N/JOnliKLpCqjyY= -github.com/cloudwebrtc/go-protoo v0.0.0-20200510140124-cf4744e87257/go.mod h1:Q0DiItmsD5iCBdeID9Xu03ok8bemc78XJ+0rYATQbuQ= -github.com/cloudwebrtc/nats-protoo v0.0.0-20200328144814-d3c1c848d442 h1:jQwLu2qva1swuN+HcWSpr+SrJ/zPScQegf6gVG0fiok= -github.com/cloudwebrtc/nats-protoo v0.0.0-20200328144814-d3c1c848d442/go.mod h1:GChOgYiUBBZTCzjSPbPCDIQCAtScaaJmvb41Vw83TLQ= +github.com/cloudwebrtc/go-protoo v0.0.0-20200602160428-0a199e23f7e0 h1:7lmqBSdb1ILwUJalqJdIoWPH0cnnQt4NshxXnVvrmx0= +github.com/cloudwebrtc/go-protoo v0.0.0-20200602160428-0a199e23f7e0/go.mod h1:Q0DiItmsD5iCBdeID9Xu03ok8bemc78XJ+0rYATQbuQ= +github.com/cloudwebrtc/nats-protoo v0.0.0-20200602160410-25aa7ab703af h1:M6ONhwD/EXjYLVdw23/xFkcQUEEj3tn500N3S6d4bLA= +github.com/cloudwebrtc/nats-protoo v0.0.0-20200602160410-25aa7ab703af/go.mod h1:zwKwTqbrcBl9o2AHopHYlMh4KM3wMQHYeR6TrtoRCQ0= github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= @@ -60,11 +60,11 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -91,11 +91,22 @@ github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= @@ -142,6 +153,7 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9 h1:9yzud/Ht36ygwatGx56VwCZtlI/2AD15T1X2sjSuGns= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -190,18 +202,16 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= -github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g= -github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= -github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= -github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= -github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nats-server/v2 v2.1.7 h1:jCoQwDvRYJy3OpOTHeYfvIPLP46BMeDmH7XEJg/r42I= +github.com/nats-io/nats-server/v2 v2.1.7/go.mod h1:rbRrRE/Iv93O/rUvZ9dh4NfT0Cm9HWjW/BqOWLGgYiE= +github.com/nats-io/nats.go v1.10.0 h1:L8qnKaofSfNFbXg0C5F71LdjPRnmQwSsA4ukmkt1TvY= +github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.1.4 h1:aEsHIssIk6ETN5m2/MD8Y4B2X7FfXrBAUdkyRvbVYzA= +github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/notedit/sdp v0.0.4 h1:P4L8HbZ8SfzrRDE2m3zPnkHhcSdr/0sZkapKo0lyDJs= @@ -223,6 +233,17 @@ github.com/pion/dtls/v2 v2.0.0 h1:Fk+MBhLZ/U1bImzAhmzwbO/pP2rKhtTw8iA934H3ybE= github.com/pion/dtls/v2 v2.0.0/go.mod h1:VkY5VL2wtsQQOG60xQ4lkV5pdn0wwBBTzCfRJqXhp3A= github.com/pion/ice v0.7.15 h1:s1In+gnuyVq7WKWGVQL+1p+OcrMsbfL+VfSe2isH8Ag= github.com/pion/ice v0.7.15/go.mod h1:Z6zybEQgky5mZkKcLfmvc266JukK2srz3VZBBD1iXBw= +github.com/pion/ion v0.4.2-0.20200602171638-d7dc6b713737/go.mod h1:pzWW8GmeHqhuFfoyuhgUsNFcHsAihgIxvdRLFmyv4lM= +github.com/pion/ion-elements v0.0.0-20200602184012-6219471db5b9 h1:QvYlkLQWQea44i2GThoV7TmAgxRZAZ7QJbYjTXzbgaQ= +github.com/pion/ion-elements v0.0.0-20200602184012-6219471db5b9/go.mod h1:8ptjqR31WjjKkRZQlWP1el4t1VHUfkdyDGJaBhd2OGM= +github.com/pion/ion-elements v0.1.0 h1:8ARiVz+zX1p8Ji9tWiYSr3UDC1877k5WGOPO2CAdbhg= +github.com/pion/ion-elements v0.1.0/go.mod h1:8ptjqR31WjjKkRZQlWP1el4t1VHUfkdyDGJaBhd2OGM= +github.com/pion/ion-elements v0.1.1-0.20200603152220-97e954fa8147 h1:vMimzA4eshiRuTtC+sTQtJCCWLuVP1MMtP96eq6aaos= +github.com/pion/ion-elements v0.1.1-0.20200603152220-97e954fa8147/go.mod h1:8ptjqR31WjjKkRZQlWP1el4t1VHUfkdyDGJaBhd2OGM= +github.com/pion/ion-elements v0.1.1-0.20200603153622-8d2b6b850c9b h1:ko42eDxfJcqNjkSgi3iM0jSF/pt0K3E2i/eApgrl4tE= +github.com/pion/ion-elements v0.1.1-0.20200603153622-8d2b6b850c9b/go.mod h1:8ptjqR31WjjKkRZQlWP1el4t1VHUfkdyDGJaBhd2OGM= +github.com/pion/ion-elements v0.1.1 h1:loeBUKq1YAOOw5swvsdHfb9B/6KiipiTvS0BDmxc18g= +github.com/pion/ion-elements v0.1.1/go.mod h1:8ptjqR31WjjKkRZQlWP1el4t1VHUfkdyDGJaBhd2OGM= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/mdns v0.0.4 h1:O4vvVqr4DGX63vzmO6Fw9vpy3lfztVWHGCQfyw0ZLSY= @@ -233,9 +254,7 @@ github.com/pion/rtcp v1.2.1 h1:S3yG4KpYAiSmBVqKAfgRa5JdwBNj4zK3RLUa8JYdhak= github.com/pion/rtcp v1.2.1/go.mod h1:a5dj2d6BKIKHl43EnAOIrCczcjESrtPuMgfmL6/K6QM= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I= -github.com/pion/rtp v1.4.0 h1:EkeHEXKuJhZoRUxtL2Ie80vVg9gBH+poT9UoL8M14nw= github.com/pion/rtp v1.4.0/go.mod h1:/l4cvcKd0D3u9JLs2xSVI95YkfXW87a3br3nqmVtSlE= -github.com/pion/rtp v1.5.3 h1:5OPyxyTa1zclKP6mOaQFuxbslGEekIdnqGu9iR2Hvg4= github.com/pion/rtp v1.5.3/go.mod h1:bg60AL5GotNOlYZsqycbhDtEV3TkfbpXG0KBiUq29Mg= github.com/pion/rtp v1.5.4 h1:PuNg6xqV3brIUihatcKZj1YDUs+M45L0ZbrZWYtkDxY= github.com/pion/rtp v1.5.4/go.mod h1:bg60AL5GotNOlYZsqycbhDtEV3TkfbpXG0KBiUq29Mg= @@ -480,6 +499,15 @@ google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZi google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0 h1:cJv5/xdbk1NnMPR1VP9+HU6gupuG9MLBoH1r6RHZ2MY= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -506,5 +534,4 @@ honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= -sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= diff --git a/pkg/conf/avp/conf.go b/pkg/conf/avp/conf.go index 5df29dca1..60154e209 100644 --- a/pkg/conf/avp/conf.go +++ b/pkg/conf/avp/conf.go @@ -98,7 +98,7 @@ func (c *Config) load() bool { fmt.Printf("config file %s read failed. %v\n", c.CfgFile, err) return false } - err = viper.GetViper().UnmarshalExact(c) + err = viper.GetViper().Unmarshal(c) if err != nil { fmt.Printf("config file %s loaded failed. %v\n", c.CfgFile, err) return false diff --git a/pkg/db/redis_test.go b/pkg/db/redis_test.go index 810297507..a22982690 100644 --- a/pkg/db/redis_test.go +++ b/pkg/db/redis_test.go @@ -12,9 +12,9 @@ var ( db *Redis dc = "dc1" node = "sfu1" - room = "room1" - uid = "uuid-xxxxx-xxxxx-xxxxx-xxxxx" - mid = "mid-xxxxx-xxxxx-xxxxx-xxxxx" + room = proto.RID("room1") + uid = proto.UID("uuid-xxxxx-xxxxx-xxxxx-xxxxx") + mid = proto.MID("mid-xxxxx-xxxxx-xxxxx-xxxxx") msid0 = "pion audio" msid1 = "pion video" track0 = proto.TrackInfo{Ssrc: 3694449886, Payload: 111, Type: "audio", ID: "aid0"} @@ -36,7 +36,7 @@ var ( ukey = proto.UserInfo{ DC: dc, RID: room, - UID: room, + UID: uid, }.BuildKey() ) diff --git a/pkg/discovery/node_test.go b/pkg/discovery/node_test.go index c145f2d26..d123cdd85 100644 --- a/pkg/discovery/node_test.go +++ b/pkg/discovery/node_test.go @@ -35,15 +35,13 @@ func ServiceNodeRegistry() { serviceNode.RegisterNode("sfu", "node-name", "nats-channel-test") protoo := nprotoo.NewNatsProtoo(nprotoo.DefaultNatsURL) wg.Add(1) - protoo.OnRequest(serviceNode.GetRPCChannel(), func(request map[string]interface{}, accept nprotoo.AcceptFunc, reject nprotoo.RejectFunc) { - method := request["method"].(string) - data := request["data"].(map[string]interface{}) - log.Infof("method => %s, data => %v", method, data) + protoo.OnRequest(serviceNode.GetRPCChannel(), func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { + log.Infof("method => %s, data => %v", request.Method, request.Data) reject(404, "Not found") wg.Done() }) - protoo.OnBroadcast(serviceNode.GetEventChannel(), func(data map[string]interface{}, subj string) { + protoo.OnBroadcast(serviceNode.GetEventChannel(), func(data nprotoo.Notification, subj string) { log.Infof("Got Broadcast subj => %s, data => %v", subj, data) wg.Done() }) @@ -62,7 +60,7 @@ func ServiceNodeWatch() { req := protoo.NewRequestor(GetRPCChannel(node)) wg.Add(1) req.Request("offer", JsonEncode(`{ "sdp": "dummy-sdp"}`), - func(result map[string]interface{}) { + func(result nprotoo.RawMessage) { log.Infof("offer success: => %s", result) }, func(code int, err string) { diff --git a/pkg/node/avp/internal.go b/pkg/node/avp/internal.go index cbd9211e4..566bd371b 100644 --- a/pkg/node/avp/internal.go +++ b/pkg/node/avp/internal.go @@ -13,9 +13,9 @@ import ( func handleRequest(rpcID string) { log.Infof("handleRequest: rpcID => [%v]", rpcID) - protoo.OnRequest(rpcID, func(request map[string]interface{}, accept nprotoo.AcceptFunc, reject nprotoo.RejectFunc) { - method := request["method"].(string) - data := json.RawMessage(request["data"].([]byte)) + protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { + method := request.Method + data := request.Data log.Debugf("handleRequest: method => %s, data => %v", method, data) var proc proto.ElementInfo diff --git a/pkg/node/biz/client.go b/pkg/node/biz/client.go index b2c8c7624..f24d862f0 100644 --- a/pkg/node/biz/client.go +++ b/pkg/node/biz/client.go @@ -1,8 +1,6 @@ package biz import ( - "encoding/json" - nprotoo "github.com/cloudwebrtc/nats-protoo" "github.com/pion/ion/pkg/log" "github.com/pion/ion/pkg/proto" @@ -26,10 +24,10 @@ func login(peer *signal.Peer, msg proto.LoginMsg) (interface{}, *nprotoo.Error) // join room func join(peer *signal.Peer, msg proto.JoinMsg) (interface{}, *nprotoo.Error) { log.Infof("biz.join peer.ID()=%s msg=%v", peer.ID(), msg) - rid := msg.Rid + rid := msg.RID // Validate - if msg.Rid == "" { + if msg.RID == "" { return nil, ridError } @@ -46,37 +44,28 @@ func join(peer *signal.Peer, msg proto.JoinMsg) (interface{}, *nprotoo.Error) { // Send join => islb info := msg.Info uid := peer.ID() - islb.SyncRequest(proto.IslbClientOnJoin, util.Map("rid", rid, "uid", uid, "info", info)) + _, err := islb.SyncRequest(proto.IslbClientOnJoin, util.Map("rid", rid, "uid", uid, "info", info)) + if err != nil { + log.Errorf("IslbClientOnJoin failed %v", err.Error()) + } // Send getPubs => islb - islb.AsyncRequest(proto.IslbGetPubs, util.Map("rid", rid, "uid", uid)).Then( - func(result map[string]interface{}) { - log.Infof("IslbGetPubs: result=%v", result) - if result["pubs"] == nil { + islb.AsyncRequest(proto.IslbGetPubs, msg.RoomInfo).Then( + func(result nprotoo.RawMessage) { + var resMsg proto.GetPubResp + if err := result.Unmarshal(&resMsg); err != nil { + log.Errorf("Unmarshal pub response %v", err) return } - pubs := result["pubs"].([]interface{}) - for _, pub := range pubs { - info := pub.(map[string]interface{})["info"].(string) - mid := pub.(map[string]interface{})["mid"].(string) - uid := pub.(map[string]interface{})["uid"].(string) - rid := result["rid"].(string) - tracks := pub.(map[string]interface{})["tracks"].(map[string]interface{}) - - var infoObj map[string]interface{} - err := json.Unmarshal([]byte(info), &infoObj) - if err != nil { - log.Errorf("json.Unmarshal: err=%v", err) - } - log.Infof("IslbGetPubs: mid=%v info=%v", mid, infoObj) - // peer <= range pubs(mid) - if mid != "" { - peer.Notify(proto.ClientOnStreamAdd, util.Map("rid", rid, "uid", uid, "mid", mid, "info", infoObj, "tracks", tracks)) + log.Infof("IslbGetPubs: result=%v", result) + for _, pub := range resMsg.Pubs { + if pub.MID == "" { + continue } + notif := proto.StreamAddMsg(pub) + peer.Notify(proto.ClientOnStreamAdd, notif) } }, - func(err *nprotoo.Error) { - - }) + func(err *nprotoo.Error) {}) return emptyMap, nil } @@ -85,10 +74,10 @@ func leave(peer *signal.Peer, msg proto.LeaveMsg) (interface{}, *nprotoo.Error) log.Infof("biz.leave peer.ID()=%s msg=%v", peer.ID(), msg) defer util.Recover("biz.leave") - rid := msg.Rid + rid := msg.RID // Validate - if msg.Rid == "" { + if msg.RID == "" { return nil, ridError } @@ -100,7 +89,10 @@ func leave(peer *signal.Peer, msg proto.LeaveMsg) (interface{}, *nprotoo.Error) } islb.AsyncRequest(proto.IslbOnStreamRemove, util.Map("rid", rid, "uid", uid, "mid", "")) - islb.SyncRequest(proto.IslbClientOnLeave, util.Map("rid", rid, "uid", uid)) + _, err := islb.SyncRequest(proto.IslbClientOnLeave, util.Map("rid", rid, "uid", uid)) + if err != nil { + log.Errorf("IslbOnStreamRemove failed %v", err.Error()) + } signal.DelPeer(rid, peer.ID()) return emptyMap, nil } @@ -128,12 +120,18 @@ func publish(peer *signal.Peer, msg proto.PublishMsg) (interface{}, *nprotoo.Err rid := room.ID() uid := peer.ID() - result, err := sfu.SyncRequest(proto.ClientPublish, util.Map("uid", uid, "rid", rid, "jsep", jsep, "options", options)) + resMsg, err := sfu.SyncRequest(proto.ClientPublish, util.Map("uid", uid, "rid", rid, "jsep", jsep, "options", options)) if err != nil { log.Warnf("reject: %d => %s", err.Code, err.Reason) return nil, util.NewNpError(err.Code, err.Reason) } + var result map[string]interface{} + if err := resMsg.Unmarshal(&result); err != nil { + log.Errorf("Unmarshal pub response %v", err) + return nil, err + } + log.Infof("publish: result => %v", result) mid := util.Val(result, "mid") tracks := result["tracks"] @@ -207,12 +205,21 @@ func subscribe(peer *signal.Peer, msg proto.SubscribeMsg) (interface{}, *nprotoo return nil, util.NewNpError(500, "Not found any node for islb.") } - result, err := islb.SyncRequest(proto.IslbGetMediaInfo, util.Map("rid", rid, "mid", mid)) + result, err := islb.SyncRequest(proto.IslbGetMediaInfo, proto.MediaInfo{RID: rid, MID: mid}) if err != nil { log.Warnf("reject: %d => %s", err.Code, err.Reason) return nil, util.NewNpError(err.Code, err.Reason) } - result, err = sfu.SyncRequest(proto.ClientSubscribe, util.Map("uid", uid, "rid", rid, "mid", mid, "tracks", result["tracks"], "jsep", jsep)) + var some map[string]interface{} + if err := result.Unmarshal(&some); err != nil { + return nil, err + } + // subMsg := proto.SFUSubscribeMsg{ + // MediaInfo: proto.MediaInfo{ + // UID: uid, RID: rid, MID: mid, + // }, + // } + result, err = sfu.SyncRequest(proto.ClientSubscribe, util.Map("uid", uid, "rid", rid, "mid", mid, "tracks", some["tracks"], "jsep", jsep)) if err != nil { log.Warnf("reject: %d => %s", err.Code, err.Reason) return nil, util.NewNpError(err.Code, err.Reason) @@ -251,7 +258,7 @@ func broadcast(peer *signal.Peer, msg proto.BroadcastMsg) (interface{}, *nprotoo log.Infof("biz.broadcast peer.ID()=%s msg=%v", peer.ID(), msg) // Validate - if msg.Rid == "" || msg.Uid == "" { + if msg.RID == "" || msg.UID == "" { return nil, ridError } @@ -259,7 +266,7 @@ func broadcast(peer *signal.Peer, msg proto.BroadcastMsg) (interface{}, *nprotoo if !found { return nil, util.NewNpError(500, "Not found any node for islb.") } - rid, uid, info := msg.Rid, msg.Uid, msg.Info + rid, uid, info := msg.RID, msg.UID, msg.Info islb.AsyncRequest(proto.IslbOnBroadcast, util.Map("rid", rid, "uid", uid, "info", info)) return emptyMap, nil } diff --git a/pkg/node/biz/dispatch.go b/pkg/node/biz/dispatch.go index c9ea96cd2..7b6fccaae 100644 --- a/pkg/node/biz/dispatch.go +++ b/pkg/node/biz/dispatch.go @@ -106,27 +106,29 @@ func getRPCForIslb() (*nprotoo.Requestor, bool) { return nil, false } -func handleSFUBroadCast(msg map[string]interface{}, subj string) { - go func(msg map[string]interface{}) { - method := util.Val(msg, "method") - data := msg["data"].(map[string]interface{}) - log.Infof("handleSFUBroadCast: method=%s, data=%v", method, data) - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - switch method { +func handleSFUBroadCast(msg nprotoo.Notification, subj string) { + go func(msg nprotoo.Notification) { + var data proto.MediaInfo + if err := json.Unmarshal(msg.Data, &data); err != nil { + log.Errorf("handleSFUBroadCast Unmarshall error %v", err) + return + } + + log.Infof("handleSFUBroadCast: method=%s, data=%v", msg.Method, data) + + switch msg.Method { case proto.SFUTrickleICE: - signal.NotifyAllWithoutID(rid, uid, proto.ClientOnStreamAdd, data) + signal.NotifyAllWithoutID(data.RID, data.UID, proto.ClientOnStreamAdd, data) case proto.SFUStreamRemove: - mid := util.Val(data, "mid") islb, found := getRPCForIslb() if found { - islb.AsyncRequest(proto.IslbOnStreamRemove, util.Map("mid", mid)) + islb.AsyncRequest(proto.IslbOnStreamRemove, data) } } }(msg) } -func getRPCForSFU(mid string) (string, *nprotoo.Requestor, *nprotoo.Error) { +func getRPCForSFU(mid proto.MID) (string, *nprotoo.Requestor, *nprotoo.Error) { islb, found := getRPCForIslb() if !found { return "", nil, util.NewNpError(500, "Not found any node for islb.") @@ -136,15 +138,18 @@ func getRPCForSFU(mid string) (string, *nprotoo.Requestor, *nprotoo.Error) { return "", nil, err } + var answer proto.GetSFURPCParams + if err := json.Unmarshal(result, &answer); err != nil { + return "", nil, &nprotoo.Error{Code: 123, Reason: "Unmarshal error getRPCForSFU"} + } + log.Infof("SFU result => %v", result) - rpcID := result["rpc-id"].(string) - eventID := result["event-id"].(string) - nodeID := result["id"].(string) + rpcID := answer.RPCID rpc, found := rpcs[rpcID] if !found { rpc = protoo.NewRequestor(rpcID) - protoo.OnBroadcast(eventID, handleSFUBroadCast) + protoo.OnBroadcast(answer.EventID, handleSFUBroadCast) rpcs[rpcID] = rpc } - return nodeID, rpc, nil + return answer.ID, rpc, nil } diff --git a/pkg/node/biz/internal.go b/pkg/node/biz/internal.go index b0f903605..c56881226 100644 --- a/pkg/node/biz/internal.go +++ b/pkg/node/biz/internal.go @@ -1,42 +1,36 @@ package biz import ( + nprotoo "github.com/cloudwebrtc/nats-protoo" "github.com/pion/ion/pkg/log" "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/signal" - "github.com/pion/ion/pkg/util" ) -// strToMap make string value to map -func strToMap(msg map[string]interface{}, key string) { - val := util.Val(msg, key) - if val != "" { - m := util.Unmarshal(val) - msg[key] = m +// broadcast msg from islb +func handleIslbBroadCast(msg nprotoo.Notification, subj string) { + var isblSignalTransformMap = map[string]string{ + proto.IslbOnStreamAdd: proto.ClientOnStreamAdd, + proto.IslbOnStreamRemove: proto.ClientOnStreamRemove, + proto.IslbClientOnJoin: proto.ClientOnJoin, + proto.IslbClientOnLeave: proto.ClientOnLeave, + proto.IslbOnBroadcast: proto.ClientBroadcast, } -} + go func(msg nprotoo.Notification) { + var data proto.BroadcastMsg + if err := msg.Data.Unmarshal(&data); err != nil { + log.Errorf("Error parsing message %v", err) + return + } + var data2 map[string]interface{} + if err := msg.Data.Unmarshal(&data2); err != nil { + log.Errorf("Error parsing message %v", err) + return + } -// broadcast msg from islb -func handleIslbBroadCast(msg map[string]interface{}, subj string) { - go func(msg map[string]interface{}){ - method := util.Val(msg, "method") - data := msg["data"].(map[string]interface{}) - log.Infof("OnIslbBroadcast: method=%s, data=%v", method, data) - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - //make signal.Notify send "info" as a json object, otherwise is a string (: - strToMap(data, "info") - switch method { - case proto.IslbOnStreamAdd: - signal.NotifyAllWithoutID(rid, uid, proto.ClientOnStreamAdd, data) - case proto.IslbOnStreamRemove: - signal.NotifyAllWithoutID(rid, uid, proto.ClientOnStreamRemove, data) - case proto.IslbClientOnJoin: - signal.NotifyAllWithoutID(rid, uid, proto.ClientOnJoin, data) - case proto.IslbClientOnLeave: - signal.NotifyAllWithoutID(rid, uid, proto.ClientOnLeave, data) - case proto.IslbOnBroadcast: - signal.NotifyAllWithoutID(rid, uid, proto.ClientBroadcast, data) + log.Infof("OnIslbBroadcast: method=%s, data=%v", msg.Method, data2) + if newMethod, ok := isblSignalTransformMap[msg.Method]; ok { + signal.NotifyAllWithoutID(data.RID, data.UID, newMethod, data2) } }(msg) } diff --git a/pkg/node/islb/internal.go b/pkg/node/islb/internal.go index 195fb8039..925e8fdce 100644 --- a/pkg/node/islb/internal.go +++ b/pkg/node/islb/internal.go @@ -2,6 +2,7 @@ package islb import ( "context" + "encoding/json" "fmt" "strings" "time" @@ -89,12 +90,9 @@ func watchStream(key string) { } /*Find service nodes by name, such as sfu|mcu|sip-gateway|rtmp-gateway */ -func findServiceNode(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - service := util.Val(data, "service") - mid := "" - if data["mid"] != nil { - mid = util.Val(data, "mid") - } +func findServiceNode(data proto.FindServiceParams) (interface{}, *nprotoo.Error) { + service := data.Service + mid := data.MID if mid != "" { mkey := proto.MediaInfo{ DC: dc, @@ -113,7 +111,7 @@ func findServiceNode(data map[string]interface{}) (map[string]interface{}, *npro if service == node.Info["service"] && minfo.NID == id { rpcID := discovery.GetRPCChannel(node) eventID := discovery.GetEventChannel(node) - resp := util.Map("name", name, "rpc-id", rpcID, "event-id", eventID, "service", service, "id", id) + resp := proto.GetSFURPCParams{Name: name, RPCID: rpcID, EventID: eventID, Service: service, ID: id} log.Infof("findServiceNode: by node ID %s, [%s] %s => %s", minfo.NID, service, name, rpcID) return resp, nil } @@ -128,7 +126,7 @@ func findServiceNode(data map[string]interface{}) (map[string]interface{}, *npro eventID := discovery.GetEventChannel(node) name := node.Info["name"] id := node.Info["id"] - resp := util.Map("name", name, "rpc-id", rpcID, "event-id", eventID, "service", service, "id", id) + resp := proto.GetSFURPCParams{Name: name, RPCID: rpcID, EventID: eventID, Service: service, ID: id} log.Infof("findServiceNode: [%s] %s => %s", service, name, rpcID) return resp, nil } @@ -137,25 +135,17 @@ func findServiceNode(data map[string]interface{}) (map[string]interface{}, *npro return nil, util.NewNpError(404, fmt.Sprintf("Service node [%s] not found", service)) } -func streamAdd(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - nid := util.Val(data, "nid") - mid := util.Val(data, "mid") - +func streamAdd(data proto.StreamAddMsg) (interface{}, *nprotoo.Error) { ukey := proto.UserInfo{ DC: dc, - RID: rid, - UID: uid, - }.BuildKey() - mkey := proto.MediaInfo{ - DC: dc, - NID: nid, - RID: rid, - UID: uid, - MID: mid, + RID: data.RID, + UID: data.UID, }.BuildKey() + mInfo := data.MediaInfo + mInfo.DC = dc + mkey := mInfo.BuildKey() + field, value, err := proto.MarshalNodeField(proto.NodeInfo{ Name: nid, ID: nid, @@ -169,20 +159,10 @@ func streamAdd(data map[string]interface{}) (map[string]interface{}, *nprotoo.Er log.Errorf("Set: %v ", err) } - tracks := data["tracks"].(map[string]interface{}) - for msid, track := range tracks { + for msid, track := range data.Tracks { var infos []proto.TrackInfo - for _, tinfo := range track.([]interface{}) { - tmp := tinfo.(map[string]interface{}) - infos = append(infos, proto.TrackInfo{ - ID: tmp["id"].(string), - Type: tmp["type"].(string), - Ssrc: int(tmp["ssrc"].(float64)), - Payload: int(tmp["pt"].(float64)), - Codec: tmp["codec"].(string), - Fmtp: tmp["fmtp"].(string), - }) - } + infos = append(infos, track...) + field, value, err := proto.MarshalTrackField(msid, infos) if err != nil { log.Errorf("MarshalTrackField: %v ", err) @@ -197,25 +177,26 @@ func streamAdd(data map[string]interface{}) (map[string]interface{}, *nprotoo.Er // dc1/room1/user/info/${uid} info {"name": "Guest"} fields := redis.HGetAll(ukey) - msg := util.Map("rid", rid, "uid", uid, "mid", mid, "info", fields["info"], "tracks", tracks) - log.Infof("Broadcast: [stream-add] => %v", msg) - broadcaster.Say(proto.IslbOnStreamAdd, msg) + + var extraInfo proto.ClientUserInfo = proto.ClientUserInfo{} + if infoStr, ok := fields["info"]; ok { + if err := json.Unmarshal([]byte(infoStr), &extraInfo); err != nil { + log.Errorf("Unmarshal pub extra info %v", err) + extraInfo = data.Info + } + data.Info = extraInfo + } + + log.Infof("Broadcast: [stream-add] => %v", data) + broadcaster.Say(proto.IslbOnStreamAdd, data) watchStream(mkey) - return util.Map(), nil + return struct{}{}, nil } -func streamRemove(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - mid := util.Val(data, "mid") - - mkey := proto.MediaInfo{ - DC: dc, - RID: rid, - UID: uid, - MID: mid, - }.BuildKey() +func streamRemove(data proto.StreamRemoveMsg) (map[string]interface{}, *nprotoo.Error) { + data.DC = dc + mkey := data.BuildKey() log.Infof("streamRemove: key => %s", mkey) for _, key := range redis.Keys(mkey + "*") { @@ -228,9 +209,8 @@ func streamRemove(data map[string]interface{}) (map[string]interface{}, *nprotoo return util.Map(), nil } -func getPubs(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") +func getPubs(data proto.RoomInfo) (proto.GetPubResp, *nprotoo.Error) { + rid := data.RID //util.Val(data, "rid") key := proto.MediaInfo{ DC: dc, @@ -238,7 +218,7 @@ func getPubs(data map[string]interface{}) (map[string]interface{}, *nprotoo.Erro }.BuildKey() log.Infof("getPubs: root key=%s", key) - var pubs []map[string]interface{} + var pubs []proto.PubInfo for _, path := range redis.Keys(key + "*") { log.Infof("getPubs media info path = %s", path) info, err := proto.ParseMediaInfo(path) @@ -263,67 +243,71 @@ func getPubs(data map[string]interface{}) (map[string]interface{}, *nprotoo.Erro tracks[msid] = *infos } } - pub := util.Map("rid", rid, "uid", info.UID, "mid", info.MID, "info", fields["info"], "tracks", tracks) + + log.Infof("Fields %v", fields) + + var extraInfo proto.ClientUserInfo = proto.ClientUserInfo{} + if infoStr, ok := fields["info"]; ok { + if err := json.Unmarshal([]byte(infoStr), &extraInfo); err != nil { + log.Errorf("Unmarshal pub extra info %v", err) + extraInfo = proto.ClientUserInfo{} // Needed? + } + } + pub := proto.PubInfo{ + MediaInfo: *info, + Info: extraInfo, + Tracks: tracks, + } pubs = append(pubs, pub) } - resp := util.Map("rid", rid, "uid", uid) - resp["pubs"] = pubs + resp := proto.GetPubResp{ + RoomInfo: data, + Pubs: pubs, + } log.Infof("getPubs: resp=%v", resp) return resp, nil } -func clientJoin(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - info := util.Val(data, "info") - +func clientJoin(data proto.JoinMsg) (interface{}, *nprotoo.Error) { ukey := proto.UserInfo{ DC: dc, - RID: rid, - UID: uid, + RID: data.RID, + UID: data.UID, }.BuildKey() - log.Infof("clientJoin: set %s => %v", ukey, info) - err := redis.HSetTTL(ukey, "info", info, redisLongKeyTTL) + log.Infof("clientJoin: set %s => %v", ukey, &data.Info) + err := redis.HSetTTL(ukey, "info", &data.Info, redisLongKeyTTL) if err != nil { log.Errorf("redis.HSetTTL err = %v", err) } - msg := util.Map("rid", rid, "uid", uid, "info", info) - log.Infof("Broadcast: peer-join = %v", msg) - broadcaster.Say(proto.IslbClientOnJoin, msg) - return util.Map(), nil + log.Infof("Broadcast: peer-join = %v", data) + broadcaster.Say(proto.IslbClientOnJoin, data) + return struct{}{}, nil } -func clientLeave(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") +func clientLeave(data proto.RoomInfo) (interface{}, *nprotoo.Error) { ukey := proto.UserInfo{ DC: dc, - RID: rid, - UID: uid, + RID: data.RID, + UID: data.UID, }.BuildKey() log.Infof("clientLeave: remove key => %s", ukey) err := redis.Del(ukey) if err != nil { log.Errorf("redis.Del err = %v", err) } - msg := util.Map("rid", rid, "uid", uid) - log.Infof("Broadcast peer-leave = %v", msg) + log.Infof("Broadcast peer-leave = %v", data) //make broadcast leave msg after remove stream msg, for ion block bug time.Sleep(500 * time.Millisecond) - broadcaster.Say(proto.IslbClientOnLeave, msg) - return util.Map(), nil + broadcaster.Say(proto.IslbClientOnLeave, data) + return struct{}{}, nil } -func getMediaInfo(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - mid := util.Val(data, "mid") +func getMediaInfo(data proto.MediaInfo) (interface{}, *nprotoo.Error) { + // Ensure DC + data.DC = dc - mkey := proto.MediaInfo{ - DC: dc, - RID: rid, - MID: mid, - }.BuildKey() + mkey := data.BuildKey() log.Infof("getMediaInfo key=%s", mkey) if keys := redis.Keys(mkey + "*"); len(keys) > 0 { @@ -342,7 +326,7 @@ func getMediaInfo(data map[string]interface{}) (map[string]interface{}, *nprotoo } } - resp := util.Map("mid", mid, "tracks", tracks) + resp := util.Map("mid", data.MID, "tracks", tracks) log.Infof("getMediaInfo: resp=%v", resp) return resp, nil } @@ -350,82 +334,102 @@ func getMediaInfo(data map[string]interface{}) (map[string]interface{}, *nprotoo return nil, util.NewNpError(404, "MediaInfo Not found") } -func relay(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - mid := util.Val(data, "mid") - from := util.Val(data, "from") - - key := proto.GetPubNodePath(rid, mid) - info := redis.HGetAll(key) - for ip := range info { - method := util.Map("method", proto.IslbRelay, "sid", from, "mid", mid) - log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) - //amqp.RpcCall(ip, method, "") - } - return util.Map(), nil -} - -func unRelay(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - mid := util.Val(data, "mid") - from := util.Val(data, "from") - - key := proto.GetPubNodePath(rid, mid) - info := redis.HGetAll(key) - for ip := range info { - method := util.Map("method", proto.IslbUnrelay, "mid", mid, "sid", from) - log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) - //amqp.RpcCall(ip, method, "") - } - // time.Sleep(time.Millisecond * 10) - resp := util.Map("mid", mid, "sid", from) - log.Infof("unRelay: resp=%v", resp) - return resp, nil -} - -func broadcast(data map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { - rid := util.Val(data, "rid") - uid := util.Val(data, "uid") - info := util.Val(data, "info") - msg := util.Map("rid", rid, "uid", uid, "info", info) - log.Infof("broadcaster.Say msg=%v", msg) - broadcaster.Say(proto.IslbOnBroadcast, msg) - return util.Map(), nil +// func relay(data map[string]interface{}) (interface{}, *nprotoo.Error) { +// rid := util.Val(data, "rid") +// mid := util.Val(data, "mid") +// from := util.Val(data, "from") + +// key := proto.GetPubNodePath(rid, mid) +// info := redis.HGetAll(key) +// for ip := range info { +// method := util.Map("method", proto.IslbRelay, "sid", from, "mid", mid) +// log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) +// //amqp.RpcCall(ip, method, "") +// } +// return struct{}{}, nil +// } + +// func unRelay(data map[string]interface{}) (interface{}, *nprotoo.Error) { +// rid := util.Val(data, "rid") +// mid := util.Val(data, "mid") +// from := util.Val(data, "from") + +// key := proto.GetPubNodePath(rid, mid) +// info := redis.HGetAll(key) +// for ip := range info { +// method := util.Map("method", proto.IslbUnrelay, "mid", mid, "sid", from) +// log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) +// //amqp.RpcCall(ip, method, "") +// } +// // time.Sleep(time.Millisecond * 10) +// resp := util.Map("mid", mid, "sid", from) +// log.Infof("unRelay: resp=%v", resp) +// return resp, nil +// } + +func broadcast(data proto.BroadcastMsg) (interface{}, *nprotoo.Error) { + log.Infof("broadcaster.Say msg=%v", data) + broadcaster.Say(proto.IslbOnBroadcast, data) + return struct{}{}, nil } func handleRequest(rpcID string) { log.Infof("handleRequest: rpcID => [%v]", rpcID) - protoo.OnRequest(rpcID, func(request map[string]interface{}, accept nprotoo.AcceptFunc, reject nprotoo.RejectFunc) { - go func(request map[string]interface{}, accept nprotoo.AcceptFunc, reject nprotoo.RejectFunc) { - method := request["method"].(string) - data := request["data"].(map[string]interface{}) - log.Infof("method => %s, data => %v", method, data) + protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { + go func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { + method := request.Method + msg := request.Data + log.Infof("method => %s, data => %v", method, msg) - var result map[string]interface{} + var result interface{} err := util.NewNpError(400, fmt.Sprintf("Unkown method [%s]", method)) switch method { case proto.IslbFindService: - result, err = findServiceNode(data) + var msgData proto.FindServiceParams + if err = msg.Unmarshal(&msgData); err == nil { + result, err = findServiceNode(msgData) + } case proto.IslbOnStreamAdd: - result, err = streamAdd(data) + var msgData proto.StreamAddMsg + if err = msg.Unmarshal(&msgData); err == nil { + result, err = streamAdd(msgData) + } case proto.IslbOnStreamRemove: - result, err = streamRemove(data) + var msgData proto.StreamRemoveMsg + if err = msg.Unmarshal(&msgData); err == nil { + result, err = streamRemove(msgData) + } case proto.IslbGetPubs: - result, err = getPubs(data) + var msgData proto.RoomInfo + if err = msg.Unmarshal(&msgData); err == nil { + result, err = getPubs(msgData) + } case proto.IslbClientOnJoin: - result, err = clientJoin(data) + var msgData proto.JoinMsg + if err = msg.Unmarshal(&msgData); err == nil { + result, err = clientJoin(msgData) + } case proto.IslbClientOnLeave: - result, err = clientLeave(data) + var msgData proto.RoomInfo + if err = msg.Unmarshal(&msgData); err == nil { + result, err = clientLeave(msgData) + } case proto.IslbGetMediaInfo: - result, err = getMediaInfo(data) - case proto.IslbRelay: - result, err = relay(data) - case proto.IslbUnrelay: - result, err = unRelay(data) + var msgData proto.MediaInfo + if err = msg.Unmarshal(&msgData); err == nil { + result, err = getMediaInfo(msgData) + } + // case proto.IslbRelay: + // result, err = relay(data) + // case proto.IslbUnrelay: + // result, err = unRelay(data) case proto.IslbOnBroadcast: - result, err = broadcast(data) + var msgData proto.BroadcastMsg + if err = msg.Unmarshal(&msgData); err == nil { + result, err = broadcast(msgData) + } } if err != nil { diff --git a/pkg/node/sfu/internal.go b/pkg/node/sfu/internal.go index 3cd3540ea..5777732fe 100644 --- a/pkg/node/sfu/internal.go +++ b/pkg/node/sfu/internal.go @@ -19,23 +19,35 @@ var emptyMap = map[string]interface{}{} func handleRequest(rpcID string) { log.Infof("handleRequest: rpcID => [%v]", rpcID) - protoo.OnRequest(rpcID, func(request map[string]interface{}, accept nprotoo.AcceptFunc, reject nprotoo.RejectFunc) { - method := request["method"].(string) - data := request["data"].(map[string]interface{}) + protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { + method := request.Method + data := request.Data log.Debugf("handleRequest: method => %s, data => %v", method, data) - var result map[string]interface{} + var result interface{} err := util.NewNpError(400, fmt.Sprintf("Unkown method [%s]", method)) switch method { case proto.ClientPublish: - result, err = publish(data) + var msgData proto.PublishMsg + if err = data.Unmarshal(&msgData); err == nil { + result, err = publish(msgData) + } case proto.ClientUnPublish: - result, err = unpublish(data) + var msgData proto.UnpublishMsg + if err = data.Unmarshal(&msgData); err == nil { + result, err = unpublish(msgData) + } case proto.ClientSubscribe: - result, err = subscribe(data) + var msgData proto.SFUSubscribeMsg + if err = data.Unmarshal(&msgData); err == nil { + result, err = subscribe(msgData) + } case proto.ClientUnSubscribe: - result, err = unsubscribe(data) + var msgData proto.UnsubscribeMsg + if err = data.Unmarshal(&msgData); err == nil { + result, err = unsubscribe(msgData) + } } if err != nil { @@ -58,33 +70,22 @@ func handleTrickle(r *rtc.Router, t *transport.WebRTCTransport) { } // publish . -func publish(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { +func publish(msg proto.PublishMsg) (interface{}, *nprotoo.Error) { log.Infof("publish msg=%v", msg) - jsep := msg["jsep"].(map[string]interface{}) - if jsep == nil { + if msg.Jsep.SDP == "" { return nil, util.NewNpError(415, "publish: jsep invaild.") } - sdp := util.Val(jsep, "sdp") - uid := util.Val(msg, "uid") + uid := msg.UID mid := uuid.New().String() - offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: sdp} + offer := webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: msg.Jsep.SDP} rtcOptions := transport.RTCOptions{ Publish: true, } - options := msg["options"] - if options != nil { - options, ok := msg["options"].(map[string]interface{}) - if ok { - rtcOptions.Codec = options["codec"].(string) - // TODO parse int - rtcOptions.Bandwidth = int(options["bandwidth"].(float64)) - if util.KvOK(options, "transport-cc", "true") { - rtcOptions.TransportCC = true - } - } - } + rtcOptions.Codec = msg.Options.Codec + rtcOptions.Bandwidth = msg.Options.Bandwidth + rtcOptions.TransportCC = msg.Options.TransportCC videoCodec := strings.ToUpper(rtcOptions.Codec) @@ -116,7 +117,7 @@ func publish(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error return nil, util.NewNpError(415, "publish: transport.NewWebRTCTransport failed.") } - router := rtc.GetOrNewRouter(mid) + router := rtc.GetOrNewRouter(proto.MID(mid)) go handleTrickle(router, pub) @@ -129,14 +130,19 @@ func publish(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error router.AddPub(uid, pub) log.Infof("publish tracks %v, answer = %v", tracks, answer) - return util.Map("jsep", answer, "mid", mid, "tracks", tracks), nil + resp := proto.PublishResponseMsg{ + RTCInfo: proto.RTCInfo{Jsep: answer}, + MediaInfo: proto.MediaInfo{MID: proto.MID(mid)}, + Tracks: tracks, + } + return resp, nil } // unpublish . -func unpublish(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { +func unpublish(msg proto.UnpublishMsg) (interface{}, *nprotoo.Error) { log.Infof("unpublish msg=%v", msg) - mid := util.Val(msg, "mid") + mid := msg.MID router := rtc.GetOrNewRouter(mid) if router != nil { rtc.DelRouter(mid) @@ -146,58 +152,36 @@ func unpublish(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Err } // subscribe . -func subscribe(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { +func subscribe(msg proto.SFUSubscribeMsg) (interface{}, *nprotoo.Error) { log.Infof("subscribe msg=%v", msg) - - mid := util.Val(msg, "mid") - router := rtc.GetOrNewRouter(mid) + router := rtc.GetOrNewRouter(msg.MID) if router == nil { return nil, util.NewNpError(404, "subscribe: Router not found!") } - jsep := msg["jsep"].(map[string]interface{}) - if jsep == nil { + if msg.Jsep.SDP == "" { return nil, util.NewNpError(415, "subscribe: Unsupported Media Type") } - sdp := util.Val(jsep, "sdp") + sdp := msg.Jsep.SDP rtcOptions := transport.RTCOptions{ Subscribe: true, } - options := msg["options"] - if options != nil { - options, ok := msg["options"].(map[string]interface{}) - if ok { - rtcOptions.Codec = options["codec"].(string) - rtcOptions.Bandwidth = int(options["bandwidth"].(float64)) // TODO parse - if util.KvOK(options, "transport-cc", "true") { - rtcOptions.TransportCC = true - } - } - } + rtcOptions.Bandwidth = msg.Options.Bandwidth + rtcOptions.TransportCC = msg.Options.TransportCC - subID := uuid.New().String() + subID := proto.MID(uuid.New().String()) - tracksMap := msg["tracks"].(map[string]interface{}) - log.Infof("subscribe tracks=%v", tracksMap) + log.Infof("subscribe tracks=%v", msg.Tracks) rtcOptions.Ssrcpt = make(map[uint32]uint8) tracks := make(map[string]proto.TrackInfo) - for msid, track := range tracksMap { - for _, item := range track.([]interface{}) { - info := item.(map[string]interface{}) - trackInfo := proto.TrackInfo{ - ID: info["id"].(string), - Type: info["type"].(string), - Ssrc: int(info["ssrc"].(float64)), - Payload: int(info["pt"].(float64)), - Codec: info["codec"].(string), - Fmtp: info["fmtp"].(string), - } - rtcOptions.Ssrcpt[uint32(trackInfo.Ssrc)] = uint8(trackInfo.Payload) - tracks[msid] = trackInfo + for msid, track := range msg.Tracks { + for _, item := range track { + rtcOptions.Ssrcpt[uint32(item.Ssrc)] = uint8(item.Payload) + tracks[msid] = item } } @@ -221,7 +205,7 @@ func subscribe(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Err rtcOptions.Codecs = allowedCodecs // New api - sub := transport.NewWebRTCTransport(subID, rtcOptions) + sub := transport.NewWebRTCTransport(string(subID), rtcOptions) if sub == nil { return nil, util.NewNpError(415, "subscribe: transport.NewWebRTCTransport failed.") @@ -264,11 +248,11 @@ func subscribe(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Err } // unsubscribe . -func unsubscribe(msg map[string]interface{}) (map[string]interface{}, *nprotoo.Error) { +func unsubscribe(msg proto.UnsubscribeMsg) (interface{}, *nprotoo.Error) { log.Infof("unsubscribe msg=%v", msg) - mid := util.Val(msg, "mid") + mid := msg.MID found := false - rtc.MapRouter(func(id string, r *rtc.Router) { + rtc.MapRouter(func(id proto.MID, r *rtc.Router) { subs := r.GetSubs() for sid := range subs { if sid == mid { diff --git a/pkg/process/elements/element.go b/pkg/process/element.go similarity index 76% rename from pkg/process/elements/element.go rename to pkg/process/element.go index 3b3d9896d..744ae5051 100644 --- a/pkg/process/elements/element.go +++ b/pkg/process/element.go @@ -1,4 +1,4 @@ -package elements +package process import ( "github.com/pion/ion/pkg/process/samples" @@ -6,7 +6,9 @@ import ( // Element interface type Element interface { + Type() string Write(*samples.Sample) error + Attach(Element) error Read() <-chan *samples.Sample Close() } diff --git a/pkg/process/elements/webm.go b/pkg/process/elements/webm.go deleted file mode 100644 index 83d68b162..000000000 --- a/pkg/process/elements/webm.go +++ /dev/null @@ -1,158 +0,0 @@ -package elements - -import ( - "fmt" - "os" - "path" - - "github.com/at-wat/ebml-go/webm" - - "github.com/pion/ion/pkg/log" - "github.com/pion/ion/pkg/process/samples" -) - -const ( - // TypeWebmSaver . - TypeWebmSaver = "WebmSaver" -) - -var ( - config WebmSaverConfig -) - -// WebmSaverConfig . -type WebmSaverConfig struct { - Enabled bool - Togglable bool - DefaultOn bool - Path string -} - -// WebmSaver Module for saving rtp streams to webm -type WebmSaver struct { - id string - path string - audioWriter, videoWriter webm.BlockWriteCloser - audioTimestamp, videoTimestamp uint32 -} - -// InitWebmSaver sets initial config -func InitWebmSaver(c WebmSaverConfig) { - config = c -} - -// NewWebmSaver Initialize a new webm saver -func NewWebmSaver(id string) *WebmSaver { - return &WebmSaver{ - id: id, - path: config.Path, - } -} - -// Write sample to webmsaver -func (s *WebmSaver) Write(sample *samples.Sample) error { - if sample.Type == samples.TypeVP8 { - s.pushVP8(sample) - } else if sample.Type == samples.TypeOpus { - s.pushOpus(sample) - } - return nil -} - -func (s *WebmSaver) Read() <-chan *samples.Sample { - return nil -} - -// Close Close the WebmSaver -func (s *WebmSaver) Close() { - log.Infof("WebmSaver.Close() => %s", s.id) - if s.audioWriter != nil { - if err := s.audioWriter.Close(); err != nil { - panic(err) - } - } - if s.videoWriter != nil { - if err := s.videoWriter.Close(); err != nil { - panic(err) - } - } -} - -func (s *WebmSaver) pushOpus(sample *samples.Sample) { - if s.audioWriter != nil { - if s.audioTimestamp == 0 { - s.audioTimestamp = sample.Timestamp - } - t := (sample.Timestamp - s.audioTimestamp) / 48 - if _, err := s.audioWriter.Write(true, int64(t), sample.Payload); err != nil { - panic(err) - } - } -} - -func (s *WebmSaver) pushVP8(sample *samples.Sample) { - // Read VP8 header. - videoKeyframe := (sample.Payload[0]&0x1 == 0) - - if videoKeyframe { - // Keyframe has frame information. - raw := uint(sample.Payload[6]) | uint(sample.Payload[7])<<8 | uint(sample.Payload[8])<<16 | uint(sample.Payload[9])<<24 - width := int(raw & 0x3FFF) - height := int((raw >> 16) & 0x3FFF) - - if s.videoWriter == nil || s.audioWriter == nil { - // Initialize WebM saver using received frame size. - s.initWriter(width, height) - } - } - - if s.videoWriter != nil { - if s.videoTimestamp == 0 { - s.videoTimestamp = sample.Timestamp - } - t := (sample.Timestamp - s.videoTimestamp) / 90 - if _, err := s.videoWriter.Write(videoKeyframe, int64(t), sample.Payload); err != nil { - panic(err) - } - } -} - -func (s *WebmSaver) initWriter(width, height int) { - w, err := os.OpenFile(path.Join(s.path, fmt.Sprintf("%s.webm", s.id)), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) - if err != nil { - panic(err) - } - - ws, err := webm.NewSimpleBlockWriter(w, - []webm.TrackEntry{ - { - Name: "Audio", - TrackNumber: 1, - TrackUID: 12345, - CodecID: "A_OPUS", - TrackType: 2, - DefaultDuration: 20000000, - Audio: &webm.Audio{ - SamplingFrequency: 48000.0, - Channels: 2, - }, - }, { - Name: "Video", - TrackNumber: 2, - TrackUID: 67890, - CodecID: "V_VP8", - TrackType: 1, - DefaultDuration: 33333333, - Video: &webm.Video{ - PixelWidth: uint64(width), - PixelHeight: uint64(height), - }, - }, - }) - if err != nil { - panic(err) - } - log.Infof("WebM saver has started with video width=%d, height=%d\n", width, height) - s.audioWriter = ws[0] - s.videoWriter = ws[1] -} diff --git a/pkg/process/elements/webm_test.go b/pkg/process/elements/webm_test.go deleted file mode 100644 index 21e0a901c..000000000 --- a/pkg/process/elements/webm_test.go +++ /dev/null @@ -1,10 +0,0 @@ -package elements - -import ( - "testing" -) - -func TestWebMSaver(t *testing.T) { - saver := NewWebmSaver("id") - saver.Close() -} diff --git a/pkg/process/pipeline.go b/pkg/process/pipeline.go index 39bbbedb3..e28996c0f 100644 --- a/pkg/process/pipeline.go +++ b/pkg/process/pipeline.go @@ -5,7 +5,6 @@ import ( "time" "github.com/pion/ion/pkg/log" - "github.com/pion/ion/pkg/process/elements" "github.com/pion/ion/pkg/process/samples" "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/rtc/transport" @@ -19,8 +18,8 @@ var ( config Config ) -type getDefaultElementsFn func(id string) map[string]elements.Element -type getTogglableElementFn func(msg proto.ElementInfo) (elements.Element, error) +type getDefaultElementsFn func(id string) map[string]Element +type getTogglableElementFn func(msg proto.ElementInfo) (Element, error) // Config for pipeline type Config struct { @@ -38,7 +37,7 @@ type Config struct { // +--->element type Pipeline struct { pub transport.Transport - elements map[string]elements.Element + elements map[string]Element elementLock sync.RWMutex elementChans map[string]chan *samples.Sample sampleBuilder *samples.Builder @@ -99,11 +98,12 @@ func (p *Pipeline) start() { p.elementLock.RLock() // Push to client send queues for _, element := range p.elements { - err := element.Write(sample) - if err != nil { - log.Errorf("element.Write err=%v", err) - continue - } + go func(element Element) { + err := element.Write(sample) + if err != nil { + log.Errorf("element.Write err=%v", err) + } + }(element) } p.elementLock.RUnlock() } @@ -129,7 +129,7 @@ func (p *Pipeline) AddElement(einfo proto.ElementInfo) { } // GetElement get a node by id -func (p *Pipeline) GetElement(id string) elements.Element { +func (p *Pipeline) GetElement(id string) Element { p.elementLock.RLock() defer p.elementLock.RUnlock() return p.elements[id] @@ -152,13 +152,13 @@ func (p *Pipeline) DelElement(id string) { func (p *Pipeline) delElements() { p.elementLock.RLock() - keys := make([]string, 0, len(p.elements)) - for k := range p.elements { - keys = append(keys, k) + ids := make([]string, 0, len(p.elements)) + for id := range p.elements { + ids = append(ids, id) } p.elementLock.RUnlock() - for _, id := range keys { + for _, id := range ids { p.DelElement(id) } } diff --git a/pkg/process/samples/sample.go b/pkg/process/samples/sample.go index d0a0f497d..91f32fc1a 100644 --- a/pkg/process/samples/sample.go +++ b/pkg/process/samples/sample.go @@ -2,11 +2,10 @@ package samples // Types for samples const ( - TypeOpus = 111 - TypeVP8 = 96 - TypeVP9 = 98 - TypeH264 = 102 - TypeRGB24 = 200 + TypeOpus = 1 + TypeVP8 = 2 + TypeVP9 = 3 + TypeH264 = 4 ) // Sample of audio or video diff --git a/pkg/proto/biz.go b/pkg/proto/biz.go index 33285375f..e92acc5e0 100644 --- a/pkg/proto/biz.go +++ b/pkg/proto/biz.go @@ -10,9 +10,17 @@ type ClientUserInfo struct { Name string `json:"name"` } +func (m *ClientUserInfo) MarshalBinary() ([]byte, error) { + return json.Marshal(m) +} + +func (m *ClientUserInfo) UnmarshalBinary(data []byte) error { + return json.Unmarshal(data, m) +} + type RoomInfo struct { - Rid string `json:"rid"` - Uid string `json:"uid"` + RID RID `json:"rid"` + UID UID `json:"uid"` } type RTCInfo struct { @@ -20,12 +28,18 @@ type RTCInfo struct { } type PublishOptions struct { - Codec string `json:"codec"` - Resolution string `json:"resolution"` - Bandwidth int `json:"bandwidth"` - Audio bool `json:"audio"` - Video bool `json:"video"` - Screen bool `json:"screen"` + Codec string `json:"codec"` + Resolution string `json:"resolution"` + Bandwidth int `json:"bandwidth"` + Audio bool `json:"audio"` + Video bool `json:"video"` + Screen bool `json:"screen"` + TransportCC bool `json:"transportCC,omitempty"` +} + +type SubscribeOptions struct { + Bandwidth int `json:"bandwidth"` + TransportCC bool `json:"transportCC"` } type TrackMap map[string][]TrackInfo @@ -65,9 +79,15 @@ type UnpublishMsg struct { MediaInfo } +type SFUSubscribeMsg struct { + SubscribeMsg + Tracks TrackMap `json:"tracks"` +} + type SubscribeMsg struct { MediaInfo RTCInfo + Options SubscribeOptions } type SubscribeResponseMsg struct { diff --git a/pkg/proto/islb.go b/pkg/proto/islb.go new file mode 100644 index 000000000..afc8b0a42 --- /dev/null +++ b/pkg/proto/islb.go @@ -0,0 +1,30 @@ +package proto + +type PubInfo struct { + MediaInfo + Info ClientUserInfo `json:"info"` + Tracks TrackMap `json:"tracks"` +} + +type GetPubResp struct { + RoomInfo + Pubs []PubInfo +} + +type GetMediaParams struct { + RID RID + MID MID +} + +type FindServiceParams struct { + Service string + MID MID +} + +type GetSFURPCParams struct { + RPCID string + EventID string + ID string + Name string + Service string +} diff --git a/pkg/proto/proto.go b/pkg/proto/proto.go index 2adafd23c..d1e3037ce 100644 --- a/pkg/proto/proto.go +++ b/pkg/proto/proto.go @@ -51,6 +51,10 @@ const ( IslbID = "islb" ) +type MID string +type RID string +type UID string + /* media dc/${nid}/${rid}/${uid}/media/pub/${mid} @@ -60,13 +64,12 @@ node2 shadow msid [{ssrc: 1234, pt: 111, type:audio}] msid [{ssrc: 5678, pt: 96, type:video}] */ - type MediaInfo struct { DC string `json:"dc,omitempty"` //Data Center ID NID string `json:"nid,omitempty"` //Node ID - RID string `json:"rid,omitempty"` //Room ID - UID string `json:"uid,omitempty"` //User ID - MID string `json:"mid,omitempty"` //Media ID + RID RID `json:"rid,omitempty"` //Room ID + UID UID `json:"uid,omitempty"` //User ID + MID MID `json:"mid,omitempty"` //Media ID } func (m MediaInfo) BuildKey() string { @@ -85,7 +88,7 @@ func (m MediaInfo) BuildKey() string { if m.MID == "" { m.MID = "*" } - strs := []string{m.DC, m.NID, m.RID, m.UID, "media", "pub", m.MID} + strs := []string{m.DC, m.NID, string(m.RID), string(m.UID), "media", "pub", string(m.MID)} return strings.Join(strs, "/") } @@ -98,9 +101,9 @@ func ParseMediaInfo(key string) (*MediaInfo, error) { } info.DC = arr[0] info.NID = arr[1] - info.RID = arr[2] - info.UID = arr[3] - info.MID = arr[6] + info.RID = RID(arr[2]) + info.UID = UID(arr[3]) + info.MID = MID(arr[6]) return &info, nil } @@ -112,12 +115,12 @@ info {name: "Guest"} type UserInfo struct { DC string - RID string - UID string + RID RID + UID UID } func (u UserInfo) BuildKey() string { - strs := []string{u.DC, u.RID, "user", "info", u.UID} + strs := []string{u.DC, string(u.RID), "user", "info", string(u.UID)} return strings.Join(strs, "/") } @@ -128,8 +131,8 @@ func ParseUserInfo(key string) (*UserInfo, error) { return nil, fmt.Errorf("Can‘t parse userinfo; [%s]", key) } info.DC = arr[0] - info.RID = arr[1] - info.UID = arr[4] + info.RID = RID(arr[1]) + info.UID = UID(arr[4]) return &info, nil } diff --git a/pkg/rtc/router.go b/pkg/rtc/router.go index 8570a6a17..afa903cb7 100644 --- a/pkg/rtc/router.go +++ b/pkg/rtc/router.go @@ -5,6 +5,7 @@ import ( "time" "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/rtc/plugins" "github.com/pion/ion/pkg/rtc/transport" "github.com/pion/ion/pkg/util" @@ -25,23 +26,23 @@ const ( // Router is rtp router type Router struct { pub transport.Transport - subs map[string]transport.Transport + subs map[proto.MID]transport.Transport subLock sync.RWMutex stop bool liveTime time.Time pluginChain *plugins.PluginChain - subChans map[string]chan *rtp.Packet + subChans map[proto.MID]chan *rtp.Packet subShutdownCh chan string } // NewRouter return a new Router -func NewRouter(id string) *Router { +func NewRouter(id proto.MID) *Router { log.Infof("NewRouter id=%s", id) return &Router{ - subs: make(map[string]transport.Transport), + subs: make(map[proto.MID]transport.Transport), liveTime: time.Now().Add(liveCycle), - pluginChain: plugins.NewPluginChain(id), - subChans: make(map[string]chan *rtp.Packet), + pluginChain: plugins.NewPluginChain(string(id)), + subChans: make(map[proto.MID]chan *rtp.Packet), subShutdownCh: make(chan string, 1), } } @@ -66,7 +67,7 @@ func (r *Router) start() { select { case subID := <-r.subShutdownCh: log.Infof("Got transport shutdown %v", subID) - r.DelSub(subID) + r.DelSub(proto.MID(subID)) default: } @@ -103,7 +104,7 @@ func (r *Router) start() { } // AddPub add a pub transport -func (r *Router) AddPub(id string, t transport.Transport) transport.Transport { +func (r *Router) AddPub(id proto.UID, t transport.Transport) transport.Transport { log.Infof("AddPub id=%s", id) r.pub = t r.pluginChain.AttachPub(t) @@ -123,7 +124,7 @@ func (r *Router) DelPub() { r.pub = nil } -func MapRouter(fn func(id string, r *Router)) { +func MapRouter(fn func(id proto.MID, r *Router)) { routerLock.RLock() defer routerLock.RUnlock() for id, r := range routers { @@ -137,7 +138,7 @@ func (r *Router) GetPub() transport.Transport { return r.pub } -func (r *Router) subWriteLoop(subID string, trans transport.Transport) { +func (r *Router) subWriteLoop(subID proto.MID, trans transport.Transport) { for pkt := range r.subChans[subID] { // log.Infof(" WriteRTP %v:%v to %v PT: %v", pkt.SSRC, pkt.SequenceNumber, trans.ID(), pkt.Header.PayloadType) @@ -145,7 +146,7 @@ func (r *Router) subWriteLoop(subID string, trans transport.Transport) { // log.Errorf("wt.WriteRTP err=%v", err) // del sub when err is increasing if trans.WriteErrTotal() > maxWriteErr { - r.DelSub(trans.ID()) + r.DelSub(proto.MID(trans.ID())) } } trans.WriteErrReset() @@ -153,7 +154,7 @@ func (r *Router) subWriteLoop(subID string, trans transport.Transport) { log.Infof("Closing sub writer") } -func (r *Router) subFeedbackLoop(subID string, trans transport.Transport) { +func (r *Router) subFeedbackLoop(subID proto.MID, trans transport.Transport) { for pkt := range trans.GetRTCPChan() { if r.stop { break @@ -195,7 +196,7 @@ func (r *Router) subFeedbackLoop(subID string, trans transport.Transport) { } // AddSub add a sub to router -func (r *Router) AddSub(id string, t transport.Transport) transport.Transport { +func (r *Router) AddSub(id proto.MID, t transport.Transport) transport.Transport { //fix panic: assignment to entry in nil map if r.stop { return nil @@ -214,7 +215,7 @@ func (r *Router) AddSub(id string, t transport.Transport) transport.Transport { } // GetSub get a sub by id -func (r *Router) GetSub(id string) transport.Transport { +func (r *Router) GetSub(id proto.MID) transport.Transport { r.subLock.RLock() defer r.subLock.RUnlock() // log.Infof("Router.GetSub id=%s sub=%v", id, r.subs[id]) @@ -222,7 +223,7 @@ func (r *Router) GetSub(id string) transport.Transport { } // GetSubs get all subs -func (r *Router) GetSubs() map[string]transport.Transport { +func (r *Router) GetSubs() map[proto.MID]transport.Transport { r.subLock.RLock() defer r.subLock.RUnlock() // log.Infof("Router.GetSubs len=%v", len(r.subs)) @@ -239,7 +240,7 @@ func (r *Router) HasNoneSub() bool { } // DelSub del sub by id -func (r *Router) DelSub(id string) { +func (r *Router) DelSub(id proto.MID) { log.Infof("Router.DelSub id=%s", id) r.subLock.Lock() defer r.subLock.Unlock() @@ -257,7 +258,7 @@ func (r *Router) DelSub(id string) { func (r *Router) DelSubs() { log.Infof("Router.DelSubs") r.subLock.RLock() - keys := make([]string, 0, len(r.subs)) + keys := make([]proto.MID, 0, len(r.subs)) for k := range r.subs { keys = append(keys, k) } @@ -279,7 +280,7 @@ func (r *Router) Close() { r.DelSubs() } -func (r *Router) ReSendRTP(sid string, ssrc uint32, sn uint16) bool { +func (r *Router) ReSendRTP(sid proto.MID, ssrc uint32, sn uint16) bool { if r.pub == nil { return false } diff --git a/pkg/rtc/rtc.go b/pkg/rtc/rtc.go index d13521974..acdaf8e74 100644 --- a/pkg/rtc/rtc.go +++ b/pkg/rtc/rtc.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/rtc/plugins" "github.com/pion/ion/pkg/rtc/rtpengine" "github.com/pion/ion/pkg/rtc/transport" @@ -18,11 +19,11 @@ const ( ) var ( - routers = make(map[string]*Router) + routers = make(map[proto.MID]*Router) routerLock sync.RWMutex //CleanChannel return the dead pub's mid - CleanChannel = make(chan string, maxCleanSize) + CleanChannel = make(chan proto.MID, maxCleanSize) pluginsConfig plugins.Config stop bool @@ -77,8 +78,8 @@ func InitRTP(port int, kcpKey, kcpSalt string) error { } log.Infof("accept new rtp id=%s conn=%s", id, rtpTransport.RemoteAddr().String()) - if router := AddRouter(id); router != nil { - router.AddPub(id, rtpTransport) + if router := AddRouter(proto.MID(id)); router != nil { + router.AddPub(proto.UID(id), rtpTransport) } }(rtpTransport) } @@ -87,7 +88,7 @@ func InitRTP(port int, kcpKey, kcpSalt string) error { return nil } -func GetOrNewRouter(id string) *Router { +func GetOrNewRouter(id proto.MID) *Router { log.Infof("rtc.GetOrNewRouter id=%s", id) router := GetRouter(id) if router == nil { @@ -97,7 +98,7 @@ func GetOrNewRouter(id string) *Router { } // GetRouter get router from map -func GetRouter(id string) *Router { +func GetRouter(id proto.MID) *Router { log.Infof("rtc.GetRouter id=%s", id) routerLock.RLock() defer routerLock.RUnlock() @@ -105,7 +106,7 @@ func GetRouter(id string) *Router { } // AddRouter add a new router -func AddRouter(id string) *Router { +func AddRouter(id proto.MID) *Router { log.Infof("rtc.AddRouter id=%s", id) routerLock.Lock() defer routerLock.Unlock() @@ -119,7 +120,7 @@ func AddRouter(id string) *Router { } // DelRouter delete pub -func DelRouter(id string) { +func DelRouter(id proto.MID) { log.Infof("DelRouter id=%s", id) router := GetRouter(id) if router == nil { @@ -165,7 +166,7 @@ func check() { CleanChannel <- id log.Infof("Stat delete %v", id) } - info += "pub: " + id + "\n" + info += "pub: " + string(id) + "\n" subs := router.GetSubs() if len(subs) < 6 { for id := range subs { diff --git a/pkg/signal/handle.go b/pkg/signal/handle.go index 14851267c..e99b5a9a7 100644 --- a/pkg/signal/handle.go +++ b/pkg/signal/handle.go @@ -65,7 +65,7 @@ func in(transport *transport.WebSocketTransport, request *http.Request) { } type CloseMsg struct { - Rid string `json:"rid"` + Rid proto.RID `json:"rid"` } handleClose := func(code int, err string) { diff --git a/pkg/signal/init.go b/pkg/signal/init.go index 340d1915c..621f0e093 100644 --- a/pkg/signal/init.go +++ b/pkg/signal/init.go @@ -9,6 +9,7 @@ import ( "github.com/cloudwebrtc/go-protoo/peer" "github.com/cloudwebrtc/go-protoo/server" "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/proto" ) type AcceptFunc peer.AcceptFunc @@ -24,7 +25,7 @@ const ( var ( bizCall func(method string, peer *Peer, msg json.RawMessage, accept RespondFunc, reject RejectFunc) wsServer *server.WebSocketServer - rooms = make(map[string]*Room) + rooms = make(map[proto.RID]*Room) roomLock sync.RWMutex ) diff --git a/pkg/signal/peer.go b/pkg/signal/peer.go index c6991db15..0ce60e1b7 100644 --- a/pkg/signal/peer.go +++ b/pkg/signal/peer.go @@ -26,7 +26,7 @@ type Peer struct { peer.Peer } -func (c *Peer) Request(method string, data map[string]interface{}) { +func (c *Peer) Request(method string, data interface{}) { c.Peer.Request(method, data, accept, reject) } diff --git a/pkg/signal/room.go b/pkg/signal/room.go index 9ca4248d7..280833127 100644 --- a/pkg/signal/room.go +++ b/pkg/signal/room.go @@ -4,6 +4,7 @@ import ( "github.com/cloudwebrtc/go-protoo/peer" "github.com/cloudwebrtc/go-protoo/room" "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/proto" ) type Room struct { @@ -14,13 +15,13 @@ func (r *Room) AddPeer(peer *Peer) { r.Room.AddPeer(&peer.Peer) } -func (r *Room) ID() string { - return r.Room.ID() +func (r *Room) ID() proto.RID { + return proto.RID(r.Room.ID()) } -func newRoom(id string) *Room { +func newRoom(id proto.RID) *Room { r := &Room{ - Room: *room.NewRoom(id), + Room: *room.NewRoom(string(id)), } roomLock.Lock() rooms[id] = r @@ -28,7 +29,7 @@ func newRoom(id string) *Room { return r } -func getRoom(id string) *Room { +func getRoom(id proto.RID) *Room { roomLock.RLock() r := rooms[id] roomLock.RUnlock() @@ -77,7 +78,7 @@ func GetRoomsByPeer(id string) []*Room { return r } -func DelPeer(rid, id string) { +func DelPeer(rid proto.RID, id string) { log.Infof("DelPeer rid=%s id=%s", rid, id) room := getRoom(rid) if room != nil { @@ -85,7 +86,7 @@ func DelPeer(rid, id string) { } } -func AddPeer(rid string, peer *Peer) { +func AddPeer(rid proto.RID, peer *Peer) { log.Infof("AddPeer rid=%s peer.ID=%s", rid, peer.ID()) room := getRoom(rid) if room == nil { @@ -94,7 +95,7 @@ func AddPeer(rid string, peer *Peer) { room.AddPeer(peer) } -func HasPeer(rid string, peer *Peer) bool { +func HasPeer(rid proto.RID, peer *Peer) bool { log.Debugf("HasPeer rid=%s peer.ID=%s", rid, peer.ID()) room := getRoom(rid) if room == nil { @@ -103,7 +104,7 @@ func HasPeer(rid string, peer *Peer) bool { return room.GetPeer(peer.ID()) != nil } -func NotifyAllWithoutPeer(rid string, peer *Peer, method string, msg map[string]interface{}) { +func NotifyAllWithoutPeer(rid proto.RID, peer *Peer, method string, msg interface{}) { log.Debugf("signal.NotifyAllWithoutPeer rid=%s peer.ID=%s method=%s msg=%v", rid, peer.ID(), method, msg) room := getRoom(rid) if room != nil { @@ -112,7 +113,7 @@ func NotifyAllWithoutPeer(rid string, peer *Peer, method string, msg map[string] } } -func NotifyAll(rid string, method string, msg map[string]interface{}) { +func NotifyAll(rid proto.RID, method string, msg interface{}) { room := getRoom(rid) if room != nil { room.Map(func(id string, peer *peer.Peer) { @@ -123,12 +124,12 @@ func NotifyAll(rid string, method string, msg map[string]interface{}) { } } -func NotifyAllWithoutID(rid string, skipID string, method string, msg map[string]interface{}) { +func NotifyAllWithoutID(rid proto.RID, skipID proto.UID, method string, msg interface{}) { room := getRoom(rid) log.Infof("room => %v", rid) if room != nil { room.Map(func(id string, peer *peer.Peer) { - if peer != nil && peer.ID() != skipID { + if peer != nil && proto.UID(peer.ID()) != skipID { peer.Notify(method, msg) } })