From 4401e7649c36b31856ba01a2325f92f68a4d80a6 Mon Sep 17 00:00:00 2001 From: cgojin Date: Fri, 30 Oct 2020 21:00:31 +0800 Subject: [PATCH] feat(proto): add nats-rpc --- .gitignore | 2 + README.md | 2 +- cmd/avp/main.go | 5 +- cmd/biz/main.go | 7 +- cmd/islb/main.go | 5 +- cmd/sfu/main.go | 4 +- go.mod | 2 +- go.sum | 3 - pkg/discovery/service.go | 2 +- pkg/node/avp/init.go | 14 +- pkg/node/avp/internal.go | 39 ++-- pkg/node/avp/sfu.go | 184 ++++++++-------- pkg/node/biz/client.go | 247 +++++++++++----------- pkg/node/biz/dispatch.go | 48 ++--- pkg/node/biz/{error_codes.go => error.go} | 13 ++ pkg/node/biz/init.go | 79 ++++--- pkg/node/biz/internal.go | 100 ++++----- pkg/node/islb/init.go | 43 +++- pkg/node/islb/internal.go | 222 ++++++------------- pkg/node/sfu/init.go | 12 +- pkg/node/sfu/internal.go | 124 +++++------ pkg/proto/biz.go | 93 +++++--- pkg/proto/nats.go | 175 +++++++++++++++ pkg/proto/nats_test.go | 116 ++++++++++ pkg/signal/handle.go | 14 +- pkg/signal/peer.go | 13 +- pkg/signal/room.go | 2 +- pkg/util/util.go | 8 - 28 files changed, 901 insertions(+), 677 deletions(-) rename pkg/node/biz/{error_codes.go => error.go} (78%) create mode 100644 pkg/proto/nats.go create mode 100644 pkg/proto/nats_test.go diff --git a/.gitignore b/.gitignore index f7a33d624..c2148dddf 100644 --- a/.gitignore +++ b/.gitignore @@ -14,5 +14,7 @@ cmd/islb/islb cmd/avp/avp ./ion .idea +.vscode node_modules/ cover.out + diff --git a/README.md b/README.md index 3dc287709..c1308097a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ + **ION** is a secure, self-hosted [WebRTC](https://webrtc.org/) video conferencing [SFU (what is an SFU?)](https://testrtc.com/different-multiparty-video-conferencing/), that you can host today in the cloud or on-premise. + This **ION** repository contains the backend cluster services, so you also need to deploy the [web app](https://github.com/pion/ion-app-web) or install a [flutter client for web, desktop or mobile](https://github.com/pion/ion-app-flutter) + **ION**'s mission is to deliver world-class tools for creating communication systems, and many people build their projects on top of it: - + [**ION** Cluster](https://github.com/pion/ion) (This project!) is composed of two services [`biz` + `ISLB` (see glossary)](docs/glossary.md) and uses `NATS`, `etcd` and `redis` as databases to administer room membership, manage text chat, verify JWT authentication and assign clients to the proper SFU in a multi-datacenter architecture. **ION** Cluster also builds its own version of `ion-sfu` binary, which is lightly adapted to use `NATS-protoo` for signaling (which is how `biz` and `ISLB` trade messages internally). + + [**ION** Cluster](https://github.com/pion/ion) (This project!) is composed of two services [`biz` + `ISLB` (see glossary)](docs/glossary.md) and uses `NATS`, `etcd` and `redis` as databases to administer room membership, manage text chat, verify JWT authentication and assign clients to the proper SFU in a multi-datacenter architecture. **ION** Cluster also builds its own version of `ion-sfu` binary, which is lightly adapted to use `NatsRPC` for signaling (which is how `biz` and `ISLB` trade messages internally). + [`ion-sfu` (external)](https://github.com/pion/ion-sfu), which handles WebRTC streams, can be used as a standalone SFU for designing custom chat experiences or implementing your own scaling architecture. `ion-sfu` is equally capable of forwarding Video, Audio and DataChannel tracks, and can handle arbitrary non-media data transport. + [`ion-avp` Audio/Video Processing](https://github.com/pion/ion-avp) (WIP) is a sidecar utility for running realtime AV processing jobs, including `write-to-disk`, `ffmpeg` and `openCV` + *`ion-live` LIVE node (planned)* - A feed streaming gateway for supporting publishing to and from SIP/RTMP/HLS/RTSP endpoints diff --git a/cmd/avp/main.go b/cmd/avp/main.go index fd2f5df3d..66fc0a952 100644 --- a/cmd/avp/main.go +++ b/cmd/avp/main.go @@ -47,10 +47,7 @@ func main() { serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc) serviceNode.RegisterNode("avp", "node-avp", "avp-channel-id") - - rpcID := serviceNode.GetRPCChannel() - eventID := serviceNode.GetEventChannel() - avp.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], rpcID, eventID, conf.Nats.URL) + avp.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL) select {} } diff --git a/cmd/biz/main.go b/cmd/biz/main.go index e1322a4e1..34e11276e 100644 --- a/cmd/biz/main.go +++ b/cmd/biz/main.go @@ -45,13 +45,10 @@ func main() { serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc) serviceNode.RegisterNode("biz", "node-biz", "biz-channel-id") - - rpcID := serviceNode.GetRPCChannel() - eventID := serviceNode.GetEventChannel() - biz.Init(conf.Global.Dc, serviceNode.NodeInfo().ID, rpcID, eventID, conf.Nats.URL, conf.Signal.AuthRoom, conf.Avp.Elements) + biz.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL, conf.Signal.AuthRoom, conf.Avp.Elements) serviceWatcher := discovery.NewServiceWatcher(conf.Etcd.Addrs, conf.Global.Dc) - go serviceWatcher.WatchServiceNode("islb", biz.WatchServiceNodes) + go serviceWatcher.WatchServiceNode("islb", biz.WatchIslbNodes) defer close() select {} diff --git a/cmd/islb/main.go b/cmd/islb/main.go index d134214dd..fd84c1e59 100644 --- a/cmd/islb/main.go +++ b/cmd/islb/main.go @@ -38,9 +38,8 @@ func main() { Pwd: conf.Redis.Pwd, DB: conf.Redis.DB, } - rpcID := serviceNode.GetRPCChannel() - eventID := serviceNode.GetEventChannel() - islb.Init(conf.Global.Dc, serviceNode.NodeInfo().ID, rpcID, eventID, redisCfg, conf.Etcd.Addrs, conf.Nats.URL) + + islb.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], redisCfg, conf.Etcd.Addrs, conf.Nats.URL) serviceWatcher := discovery.NewServiceWatcher(conf.Etcd.Addrs, conf.Global.Dc) go serviceWatcher.WatchServiceNode("", islb.WatchServiceNodes) diff --git a/cmd/sfu/main.go b/cmd/sfu/main.go index 53022c21f..9cc24ffea 100644 --- a/cmd/sfu/main.go +++ b/cmd/sfu/main.go @@ -39,8 +39,6 @@ func main() { serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc) serviceNode.RegisterNode("sfu", "node-sfu", "sfu-channel-id") - rpcID := serviceNode.GetRPCChannel() - eventID := serviceNode.GetEventChannel() - sfu.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], rpcID, eventID, conf.Nats.URL) + sfu.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL) select {} } diff --git a/go.mod b/go.mod index d13cc7c54..17ea3c545 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.26.0 require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906 - github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de github.com/coreos/etcd v3.3.25+incompatible // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/go-ole/go-ole v1.2.4 // indirect @@ -16,6 +15,7 @@ require ( github.com/google/go-cmp v0.5.2 // indirect github.com/google/uuid v1.1.2 github.com/gorilla/websocket v1.4.2 + github.com/nats-io/nats.go v1.10.0 github.com/notedit/sdp v0.0.4 github.com/pion/ion-avp v1.0.39 github.com/pion/ion-log v0.0.0-20201024224650-e6b94dfeaf1d diff --git a/go.sum b/go.sum index 43bf1b70e..f078bb1e8 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,6 @@ github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 h1:xz6Nv3zcw github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9/go.mod h1:2wSM9zJkl1UQEFZgSd68NfCgRz1VL1jzy/RjCg+ULrs= github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906 h1:CXJrfUVNhSAKWnb+oSvd8MBCQJHK6+RS7jLvx2z3ba0= github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906/go.mod h1:Q0DiItmsD5iCBdeID9Xu03ok8bemc78XJ+0rYATQbuQ= -github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de h1:yPvjphU5iEeYTOOPzSwU9TNoy0nOwtv5LYMZMVtOrPY= -github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de/go.mod h1:zwKwTqbrcBl9o2AHopHYlMh4KM3wMQHYeR6TrtoRCQ0= github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= @@ -347,7 +345,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.14.3/go.mod h1:3WXPzbXEEliJ+a6UFE4vhIxV8qR1EML6ngzP9ug4eYg= -github.com/rs/zerolog v1.17.2/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I= github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs= github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= diff --git a/pkg/discovery/service.go b/pkg/discovery/service.go index 149308521..f6e0b6791 100644 --- a/pkg/discovery/service.go +++ b/pkg/discovery/service.go @@ -94,7 +94,7 @@ func decode(ds []byte) map[string]string { var s map[string]string err := json.Unmarshal(ds, &s) if err != nil { - log.Errorf("service.decode err => %+v", err) + log.Errorf("service.decode err => %v", err) return nil } return s diff --git a/pkg/node/avp/init.go b/pkg/node/avp/init.go index 0631d89b6..15c03a758 100644 --- a/pkg/node/avp/init.go +++ b/pkg/node/avp/init.go @@ -1,21 +1,19 @@ package avp -import ( - nprotoo "github.com/cloudwebrtc/nats-protoo" -) +import "github.com/pion/ion/pkg/proto" var ( //nolint:unused dc = "default" //nolint:unused - nid = "avp-unkown-node-id" - protoo *nprotoo.NatsProtoo + nid = "avp-unkown-node-id" + nrpc *proto.NatsRPC ) // Init func -func Init(dcID, nodeID, rpcID, eventID, natsURL string) { +func Init(dcID, nodeID, natsURL string) { dc = dcID nid = nodeID - protoo = nprotoo.NewNatsProtoo(natsURL) - handleRequest(rpcID) + nrpc = proto.NewNatsRPC(natsURL) + handleRequest(nid) } diff --git a/pkg/node/avp/internal.go b/pkg/node/avp/internal.go index 458d1c049..e9613c478 100644 --- a/pkg/node/avp/internal.go +++ b/pkg/node/avp/internal.go @@ -1,40 +1,31 @@ package avp import ( - "fmt" + "errors" - nprotoo "github.com/cloudwebrtc/nats-protoo" log "github.com/pion/ion-log" "github.com/pion/ion/pkg/proto" - "github.com/pion/ion/pkg/util" ) func handleRequest(rpcID string) { - log.Infof("handleRequest: rpcID => [%v]", rpcID) - protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - method := request.Method - data := request.Data - log.Infof("handleRequest: method => %s, data => %s", method, data) + log.Infof("handleRequest: rpcID => [%s]", rpcID) - var result interface{} - errResult := util.NewNpError(400, fmt.Sprintf("Unknown method [%s]", method)) + _, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + log.Infof("handleRequest: %T, %+v", msg, msg) - switch method { - case proto.AvpProcess: - var msg proto.ToAvpProcessMsg - if errResult = data.Unmarshal(&msg); errResult != nil { - break + switch v := msg.(type) { + case *proto.ToAvpProcessMsg: + if err := s.Process(v.Addr, v.PID, v.SID, v.TID, v.EID, v.Config); err != nil { + return nil, err } - if err := s.Process(msg.Addr, msg.PID, msg.SID, msg.TID, msg.EID, msg.Config); err != nil { - errResult = util.NewNpError(500, err.Error()) - } - errResult = nil + default: + return nil, errors.New("unkonw message") } - if errResult != nil { - reject(errResult.Code, errResult.Reason) - } else { - accept(result) - } + return nil, nil }) + + if err != nil { + log.Errorf("nrpc subscribe error: %v", err) + } } diff --git a/pkg/node/avp/sfu.go b/pkg/node/avp/sfu.go index a9721f43f..c32a90389 100644 --- a/pkg/node/avp/sfu.go +++ b/pkg/node/avp/sfu.go @@ -2,16 +2,14 @@ package avp import ( "context" - "encoding/json" - "fmt" + "errors" "sync" - nprotoo "github.com/cloudwebrtc/nats-protoo" "github.com/google/uuid" + "github.com/nats-io/nats.go" iavp "github.com/pion/ion-avp/pkg" log "github.com/pion/ion-log" "github.com/pion/ion/pkg/proto" - "github.com/pion/ion/pkg/util" "github.com/pion/webrtc/v3" ) @@ -19,12 +17,11 @@ import ( type sfu struct { ctx context.Context cancel context.CancelFunc - client *nprotoo.Requestor config iavp.Config mu sync.RWMutex - addr string - mid proto.MID + client string + mid proto.MID onCloseFn func() transports map[string]*iavp.WebRTCTransport @@ -34,18 +31,14 @@ type sfu struct { func newSFU(addr string, config iavp.Config) (*sfu, error) { log.Infof("Connecting to sfu: %s", addr) - // Set up a connection to the sfu server. - client := protoo.NewRequestor("rpc-" + addr) - ctx, cancel := context.WithCancel(context.Background()) return &sfu{ ctx: ctx, cancel: cancel, - client: client, + client: addr, config: config, - addr: addr, - mid: proto.MID(uuid.New().String()), + mid: proto.MID(uuid.New().String()), transports: make(map[string]*iavp.WebRTCTransport), }, nil @@ -61,13 +54,15 @@ func (s *sfu) getTransport(sid string) (*iavp.WebRTCTransport, error) { // no transport yet, create one if t == nil { var err error - if t, err = s.join(sid); err != nil { + var sub *nats.Subscription + if t, sub, err = s.join(sid); err != nil { return nil, err } t.OnClose(func() { s.mu.Lock() defer s.mu.Unlock() delete(s.transports, sid) + sub.Unsubscribe() if len(s.transports) == 0 && s.onCloseFn != nil { s.cancel() s.onCloseFn() @@ -84,123 +79,112 @@ func (s *sfu) onClose(f func()) { s.onCloseFn = f } -func (s *sfu) join(sid string) (*iavp.WebRTCTransport, error) { +func (s *sfu) join(sid string) (*iavp.WebRTCTransport, *nats.Subscription, error) { log.Infof("Joining sfu session: %s", sid) t := iavp.NewWebRTCTransport(sid, s.config) + // handle sfu message + rpcID := nid + "-" + sid + sub, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + log.Infof("handle sfu message: %+v", msg) + + switch v := msg.(type) { + case *proto.SfuTrickleMsg: + log.Infof("got remote candidate: %v", v.Candidate) + if err := t.AddICECandidate(v.Candidate); err != nil { + log.Errorf("add ice candidate error: %s", err) + return nil, err + } + case *proto.SfuOfferMsg: + log.Infof("got remote description: %v", v.Jsep) + if err := t.SetRemoteDescription(v.Jsep); err != nil { + log.Errorf("set remote description error: ", err) + return nil, err + } + + answer, err := t.CreateAnswer() + if err != nil { + log.Errorf("create answer error: ", err) + return nil, err + } + + if err = t.SetLocalDescription(answer); err != nil { + log.Errorf("set local description error: ", err) + return nil, err + } + + log.Infof("send description to [%s]: %v", s.client, answer) + if err := nrpc.Publish(s.client, proto.SfuAnswerMsg{ + MID: v.MID, + RTCInfo: proto.RTCInfo{Jsep: answer}, + }); err != nil { + log.Errorf("send description to [%s] error: %v", s.client, err) + return nil, err + } + default: + return nil, errors.New("unkonw message") + } + + return nil, nil + }) + if err != nil { + log.Errorf("nrpc subscribe error: %v", err) + } + + // join to sfu offer, err := t.CreateOffer() if err != nil { log.Errorf("Error creating offer: %v", err) - return nil, err + return nil, nil, err } - if err = t.SetLocalDescription(offer); err != nil { log.Errorf("Error setting local description: %v", err) - return nil, err + return nil, nil, err } - - rpcID := "rpc-" + nid + "-" + sid - - log.Infof("Send offer:\n %s", offer) - resp, npErr := s.client.SyncRequest(proto.SfuClientJoin, proto.ToSfuJoinMsg{ + req := proto.ToSfuJoinMsg{ RPCID: rpcID, MID: s.mid, SID: proto.SID(sid), RTCInfo: proto.RTCInfo{Jsep: offer}, - }) - if npErr != nil { - log.Errorf("Error sending join request: %s, %v", resp, npErr) - return nil, err } - var msg proto.FromSfuJoinMsg - if err := json.Unmarshal(resp, &msg); err != nil { - log.Errorf("SfuClientOnJoin failed %v", err) + log.Infof("join to [%s]: %v", s.client, req) + resp, err := nrpc.Request(s.client, req) + if err != nil { + log.Errorf("join to [%s] failed: %s", s.client, err) + return nil, nil, err + } + msg, ok := resp.(*proto.FromSfuJoinMsg) + if !ok { + log.Errorf("join reply msg parses failed") + return nil, nil, errors.New("join reply msg parses failed") } - log.Infof("Join reply: %v", msg) + log.Infof("join reply: %v", msg) if err := t.SetRemoteDescription(msg.Jsep); err != nil { log.Errorf("Error set remote description: %s", err) - return nil, err + return nil, nil, err } + // send candidates to sfu t.OnICECandidate(func(c *webrtc.ICECandidate) { log.Errorf("OnICECandidate: %v", c) if c == nil { // Gathering done return } - s.client.AsyncRequest(proto.SfuClientTrickle, proto.SfuTrickleMsg{ + data := proto.SfuTrickleMsg{ MID: s.mid, Candidate: c.ToJSON(), - }) - }) - - // handle sfu message - protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - method := request.Method - data := request.Data - log.Infof("handle sfu message: method => %s, data => %s", method, data) - - var result interface{} - errResult := util.NewNpError(400, fmt.Sprintf("unknown method [%s]", method)) - - switch method { - case proto.SfuTrickleICE: - var msg proto.SfuTrickleMsg - if err := data.Unmarshal(&msg); err != nil { - log.Errorf("trickle message unmarshal error: %s", err) - errResult = util.NewNpError(415, "trickle message unmarshal error") - break - } - - if err := t.AddICECandidate(msg.Candidate); err != nil { - log.Errorf("add ice candidate error: %s", err) - errResult = util.NewNpError(415, "add ice candidate error") - break - } - errResult = nil - case proto.SfuClientOffer: - var msg proto.SfuNegotiationMsg - if err := data.Unmarshal(&msg); err != nil { - log.Errorf("offer message unmarshal error: %s", err) - errResult = util.NewNpError(415, "offer message unmarshal error") - break - } - log.Infof("got remote description: %v", msg.Jsep) - - if err := t.SetRemoteDescription(msg.Jsep); err != nil { - log.Errorf("set remote description error: ", err) - errResult = util.NewNpError(415, "set remote sdp error") - break - } - - var err error - var answer webrtc.SessionDescription - if answer, err = t.CreateAnswer(); err != nil { - log.Errorf("create answer error: ", err) - errResult = util.NewNpError(415, "create answer error") - } - - if err = t.SetLocalDescription(answer); err != nil { - log.Errorf("set local description error: ", err) - errResult = util.NewNpError(415, "create answer error") - } - - log.Infof("create local description: %v", answer) - - s.client.AsyncRequest(proto.SfuClientAnswer, proto.SfuNegotiationMsg{ - MID: msg.MID, - RTCInfo: proto.RTCInfo{Jsep: answer}, - }) - errResult = nil } - - if errResult != nil { - reject(errResult.Code, errResult.Reason) - } else { - accept(result) + log.Infof("send trickle to [%s]: %v", s.client, data) + if err := nrpc.Publish(s.client, data); err != nil { + log.Errorf("send trickle to [%s] error: %v", s.client, err) } }) - return t, nil + if err != nil { + log.Errorf("nrpc subscribe error: %v", err) + } + + return t, sub, nil } diff --git a/pkg/node/biz/client.go b/pkg/node/biz/client.go index 0b1ec5ddd..99957fabf 100644 --- a/pkg/node/biz/client.go +++ b/pkg/node/biz/client.go @@ -1,27 +1,24 @@ package biz import ( - "encoding/json" - "fmt" + "errors" - nprotoo "github.com/cloudwebrtc/nats-protoo" "github.com/google/uuid" "github.com/notedit/sdp" log "github.com/pion/ion-log" "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/signal" - "github.com/pion/ion/pkg/util" ) var ( - ridError = util.NewNpError(codeRoomErr, codeStr(codeRoomErr)) - jsepError = util.NewNpError(codeJsepErr, codeStr(codeJsepErr)) - // sdpError = util.NewNpError(codeSDPErr, codeStr(codeSDPErr)) - midError = util.NewNpError(codeMIDErr, codeStr(codeMIDErr)) + ridError = newError(codeRoomErr, codeStr(codeRoomErr)) + jsepError = newError(codeJsepErr, codeStr(codeJsepErr)) + sdpError = newError(codeSDPErr, codeStr(codeSDPErr)) + midError = newError(codeMIDErr, codeStr(codeMIDErr)) ) // join room -func join(peer *signal.Peer, msg proto.FromClientJoinMsg) (interface{}, *nprotoo.Error) { +func join(peer *signal.Peer, msg proto.FromClientJoinMsg) (interface{}, *httpError) { log.Infof("biz.join peer.ID()=%s msg=%v", peer.ID(), msg) rid := msg.RID @@ -31,19 +28,19 @@ func join(peer *signal.Peer, msg proto.FromClientJoinMsg) (interface{}, *nprotoo } sdpInfo, err := sdp.Parse(msg.Jsep.SDP) if err != nil { - return nil, util.NewNpError(400, "Could not parse SDP") + return nil, sdpError } - islb := getRPCForIslb() - if islb == nil { - return nil, util.NewNpError(500, "Not found any node for islb.") + islb := getIslb() + if islb == "" { + return nil, newError(500, "Not found any node for islb.") } uid := peer.ID() // already joined this room, removing old peer if p := signal.GetPeer(rid, uid); p != nil { log.Infof("biz.join peer.ID()=%s already joined, removing old peer", uid) - if _, err := islb.SyncRequest(proto.IslbPeerLeave, proto.IslbPeerLeaveMsg{ + if _, err := nrpc.Request(islb, proto.IslbPeerLeaveMsg{ RoomInfo: proto.RoomInfo{UID: uid, RID: msg.RID}, }); err != nil { log.Errorf("IslbClientOnLeave failed %v", err.Error()) @@ -54,115 +51,100 @@ func join(peer *signal.Peer, msg proto.FromClientJoinMsg) (interface{}, *nprotoo signal.AddPeer(rid, peer) mid := proto.MID(uuid.New().String()) - sfuID, sfu, npErr := getRPCForNode("sfu", islb, uid, rid, mid) - if npErr != nil { - log.Errorf("error getting sfu: %v", npErr) - return nil, util.NewNpError(500, "Not found any node for sfu.") + sfu, err := getNode("sfu", islb, uid, rid, mid) + if err != nil { + log.Errorf("error getting sfu: %v", err) + return nil, newError(500, "Not found any node for sfu.") } info := msg.Info - // Send join => islb - resp, npErr := islb.SyncRequest(proto.IslbPeerJoin, proto.ToIslbPeerJoinMsg{ + // join to islb + resp, err := nrpc.Request(islb, proto.ToIslbPeerJoinMsg{ UID: uid, RID: rid, MID: mid, Info: info, }) - if npErr != nil { - log.Errorf("IslbClientOnJoin failed %v", npErr) - } - var fromIslbPeerJoinMsg proto.FromIslbPeerJoinMsg - if err := json.Unmarshal(resp, &fromIslbPeerJoinMsg); err != nil { + if err != nil { log.Errorf("IslbClientOnJoin failed %v", err) } - - rpcID := "rpc-" + nid + "-" + string(fromIslbPeerJoinMsg.SID) + "-" + string(uid) - - // Send join => sfu - resp, npErr = sfu.SyncRequest(proto.SfuClientJoin, proto.ToSfuJoinMsg{ - RPCID: rpcID, - MID: mid, - SID: fromIslbPeerJoinMsg.SID, - RTCInfo: msg.RTCInfo, - }) - if npErr != nil { - log.Errorf("SfuClientOnJoin failed %v", npErr) - } - var fromSfuJoinMsg proto.FromSfuJoinMsg - if err := json.Unmarshal(resp, &fromSfuJoinMsg); err != nil { - log.Errorf("SfuClientOnJoin failed %v", err) + fromIslbPeerJoinMsg, ok := resp.(*proto.FromIslbPeerJoinMsg) + if !ok { + log.Errorf("IslbClientOnJoin failed %v", fromIslbPeerJoinMsg) } // handle sfu message - protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - method := request.Method - data := request.Data - log.Infof("peer(%s) handle sfu message: method => %s, data => %s", uid, method, data) - - var result interface{} - errResult := util.NewNpError(400, fmt.Sprintf("unknown method [%s]", method)) - - switch method { - case proto.SfuTrickleICE: - var msg proto.SfuTrickleMsg - if err := data.Unmarshal(&msg); err != nil { - log.Errorf("peer(%s) trickle message unmarshal error: %s", uid, err) - errResult = util.NewNpError(415, "trickle message unmarshal error") - break - } + rpcID := nid + "-" + string(fromIslbPeerJoinMsg.SID) + "-" + string(uid) + sub, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + log.Infof("peer(%s) handle sfu message: %+v", uid, msg) + switch v := msg.(type) { + case *proto.SfuTrickleMsg: + log.Infof("peer(%s) got a remote candidate: %s", uid, v.Candidate) signal.NotifyPeer(proto.ClientTrickleICE, rid, uid, proto.ClientTrickleMsg{ RID: rid, - MID: msg.MID, - Candidate: msg.Candidate, + MID: v.MID, + Candidate: proto.CandidateForJSON(v.Candidate), }) - errResult = nil - case proto.SfuClientOffer: - var msg proto.SfuNegotiationMsg - if err := data.Unmarshal(&msg); err != nil { - log.Errorf("peer(%s) offer message unmarshal error: %s", uid, err) - errResult = util.NewNpError(415, "offer message unmarshal error") - break - } - log.Infof("peer(%s) got remote description: %v", uid, msg.Jsep) - signal.NotifyPeer(proto.ClientOffer, rid, uid, proto.ClientNegotiationMsg{ + case *proto.SfuOfferMsg: + log.Infof("peer(%s) got remote description: %s", uid, v.Jsep) + signal.NotifyPeer(proto.ClientOffer, rid, uid, proto.ClientOfferMsg{ RID: rid, - MID: msg.MID, - RTCInfo: msg.RTCInfo, + MID: v.MID, + RTCInfo: v.RTCInfo, }) - errResult = nil + default: + return nil, errors.New("unkonw message") } + return nil, nil + }) + if err != nil { + log.Errorf("subscribe sfu failed: %v", err) + return nil, newError(500, "subscribe sfu failed") + } + peer.SetCloseFun(func() { + sub.Unsubscribe() + }) - if errResult != nil { - reject(errResult.Code, errResult.Reason) - } else { - accept(result) - } + // join to sfu + resp, err = nrpc.Request(sfu, proto.ToSfuJoinMsg{ + RPCID: rpcID, + MID: mid, + SID: fromIslbPeerJoinMsg.SID, + RTCInfo: msg.RTCInfo, }) + if err != nil { + log.Errorf("join sfu error: %v", err) + } + fromSfuJoinMsg, ok := resp.(*proto.FromSfuJoinMsg) + if !ok { + log.Errorf("join reply msg parses failed") + return nil, newError(500, "join reply msg parses failed") + } // Associate the stream in the SDP with the UID/RID/MID. for key := range sdpInfo.GetStreams() { - islb.AsyncRequest(proto.IslbStreamAdd, proto.ToIslbStreamAddMsg{ + nrpc.Publish(islb, proto.ToIslbStreamAddMsg{ UID: uid, RID: rid, MID: mid, StreamID: proto.StreamID(key), }) } // Send join => avp if len(avpElements) > 0 { - _, avp, npErr := getRPCForNode("avp", islb, uid, rid, mid) - if npErr != nil { - log.Errorf("error getting avp: %v", npErr) + avp, err := getNode("avp", islb, uid, rid, mid) + if err != nil { + log.Errorf("error getting avp: %v", err) } - if avp != nil { + if avp != "" { for _, stream := range sdpInfo.GetStreams() { tracks := stream.GetTracks() for _, track := range tracks { - _, npErr = avp.SyncRequest(proto.AvpProcess, proto.ToAvpProcessMsg{ - Addr: sfuID, + err = nrpc.Publish(avp, proto.ToAvpProcessMsg{ + Addr: sfu, PID: stream.GetID(), SID: string(fromIslbPeerJoinMsg.SID), TID: track.GetID(), EID: avpElements, Config: []byte{}, }) - if npErr != nil { - log.Errorf("AvpClientJoin failed %v", npErr) + if err != nil { + log.Errorf("AvpClientJoin failed %v", err) } } } @@ -177,7 +159,7 @@ func join(peer *signal.Peer, msg proto.FromClientJoinMsg) (interface{}, *nprotoo }, nil } -func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *nprotoo.Error) { +func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *httpError) { log.Infof("biz.leave msg=%v", msg) room := signal.GetRoom(msg.RID) if room == nil { @@ -185,14 +167,15 @@ func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *nprot return nil, nil } room.DelPeer(msg.UID) + peer.Close() - islb := getRPCForIslb() - if islb == nil { + islb := getIslb() + if islb == "" { log.Errorf("islb node not found") - return nil, util.NewNpError(500, "islb node not found") + return nil, newError(500, "islb node not found") } - if _, err := islb.SyncRequest(proto.IslbPeerLeave, proto.IslbPeerLeaveMsg{ + if _, err := nrpc.Request(islb, proto.IslbPeerLeaveMsg{ RoomInfo: proto.RoomInfo{UID: msg.UID, RID: msg.RID}, }); err != nil { log.Errorf("IslbPeerLeave error: %v", err.Error()) @@ -200,13 +183,12 @@ func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *nprot var mids []proto.MID if msg.MID == "" { - var fromIslbListMids proto.FromIslbListMids - if resp, err := islb.SyncRequest(proto.IslbListMids, proto.ToIslbListMids{ + if resp, err := nrpc.Request(islb, proto.ToIslbListMids{ UID: msg.UID, RID: msg.RID, }); err == nil { - if err := json.Unmarshal(resp, &fromIslbListMids); err == nil { - mids = fromIslbListMids.MIDs + if v, ok := resp.(*proto.FromIslbListMids); ok { + mids = v.MIDs } else { log.Errorf("json.Unmarshal error: %v", err) } @@ -216,16 +198,17 @@ func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *nprot } else { mids = append(mids, msg.MID) } + for _, mid := range mids { - _, sfu, err := getRPCForNode("sfu", islb, msg.UID, msg.RID, mid) + sfu, err := getNode("sfu", islb, msg.UID, msg.RID, mid) if err != nil { - log.Errorf("Not found any sfu node: %d => %s", err.Code, err.Reason) + log.Errorf("Not found any sfu node: %s", err) continue } - if _, err := sfu.SyncRequest(proto.SfuClientLeave, proto.ToSfuLeaveMsg{ + if _, err := nrpc.Request(sfu, proto.ToSfuLeaveMsg{ MID: mid, }); err != nil { - log.Errorf("SfuClientLeave error %v", err.Error()) + log.Errorf("SfuClientLeave error: %v", err.Error()) continue } } @@ -233,56 +216,58 @@ func leave(peer *signal.Peer, msg proto.FromClientLeaveMsg) (interface{}, *nprot return nil, nil } -func offer(peer *signal.Peer, msg proto.ClientNegotiationMsg) (interface{}, *nprotoo.Error) { +func offer(peer *signal.Peer, msg proto.ClientOfferMsg) (interface{}, *httpError) { log.Infof("biz.offer peer.ID()=%s msg=%v", peer.ID(), msg) - _, sfu, err := getRPCForNode("sfu", nil, peer.ID(), msg.RID, msg.MID) + + sfu, err := getNode("sfu", "", peer.ID(), msg.RID, msg.MID) if err != nil { - log.Warnf("Not found any sfu node, reject: %d => %s", err.Code, err.Reason) - return nil, util.NewNpError(err.Code, err.Reason) + log.Warnf("Not found any sfu node: %s", err) + return nil, newError(500, "Not found any sfu node") } - resp, err := sfu.SyncRequest(proto.SfuClientOffer, proto.SfuNegotiationMsg{ + + resp, err := nrpc.Request(sfu, proto.SfuOfferMsg{ MID: msg.MID, RTCInfo: proto.RTCInfo{Jsep: msg.Jsep}, }) if err != nil { log.Errorf("SfuClientOnOffer failed %v", err.Error()) - return nil, util.NewNpError(err.Code, err.Reason) + return nil, newError(500, "SfuClientOnOffer failed") } - var answer proto.SfuNegotiationMsg - if err := json.Unmarshal(resp, &answer); err != nil { + answer, ok := resp.(*proto.SfuAnswerMsg) + if !ok { log.Errorf("Parse answer failed %v", err.Error()) - return nil, util.NewNpError(500, err.Error()) + return nil, newError(500, "Parse answer failed") } - return proto.ClientNegotiationMsg{ + return proto.ClientAnswerMsg{ RID: msg.RID, MID: msg.MID, RTCInfo: answer.RTCInfo, }, nil } -func answer(peer *signal.Peer, msg proto.ClientNegotiationMsg) (interface{}, *nprotoo.Error) { +func answer(peer *signal.Peer, msg proto.ClientAnswerMsg) (interface{}, *httpError) { log.Infof("biz.answer peer.ID()=%s msg=%v", peer.ID(), msg) - _, sfu, err := getRPCForNode("sfu", nil, peer.ID(), msg.RID, msg.MID) + sfu, err := getNode("sfu", "", peer.ID(), msg.RID, msg.MID) if err != nil { - log.Warnf("Not found any sfu node, reject: %d => %s", err.Code, err.Reason) - return nil, util.NewNpError(err.Code, err.Reason) + log.Warnf("Not found any sfu node: %s", err) + return nil, newError(500, "Not found any sfu node") } - if _, err := sfu.SyncRequest(proto.SfuClientAnswer, proto.SfuNegotiationMsg{ + if _, err := nrpc.Request(sfu, proto.SfuAnswerMsg{ MID: msg.MID, RTCInfo: msg.RTCInfo, }); err != nil { log.Errorf("SfuClientOnAnswer failed %v", err.Error()) - return nil, util.NewNpError(err.Code, err.Reason) + return nil, newError(500, err.Error()) } return nil, nil } -func broadcast(peer *signal.Peer, msg proto.FromClientBroadcastMsg) (interface{}, *nprotoo.Error) { +func broadcast(peer *signal.Peer, msg proto.FromClientBroadcastMsg) (interface{}, *httpError) { log.Infof("biz.broadcast peer.ID()=%s msg=%v", peer.ID(), msg) // Validate @@ -290,33 +275,43 @@ func broadcast(peer *signal.Peer, msg proto.FromClientBroadcastMsg) (interface{} return nil, ridError } - islb := getRPCForIslb() - if islb == nil { - return nil, util.NewNpError(500, "Not found any node for islb.") + islb := getIslb() + if islb == "" { + return nil, newError(500, "Not found any node for islb.") } - islb.AsyncRequest(proto.IslbBroadcast, proto.IslbBroadcastMsg{ + + err := nrpc.Publish(islb, proto.IslbBroadcastMsg{ RoomInfo: proto.RoomInfo{UID: peer.ID(), RID: msg.RID}, Info: msg.Info, }) + if err != nil { + log.Errorf("Broadcast error: %s", err.Error()) + return nil, newError(500, "Broadcast error") + } + return emptyMap, nil } -func trickle(peer *signal.Peer, msg proto.ClientTrickleMsg) (interface{}, *nprotoo.Error) { +func trickle(peer *signal.Peer, msg proto.ClientTrickleMsg) (interface{}, *httpError) { log.Infof("biz.trickle peer.ID()=%s msg=%v", peer.ID(), msg) - // Validate if msg.RID == "" { return nil, ridError } - _, sfu, err := getRPCForNode("sfu", nil, peer.ID(), msg.RID, msg.MID) + sfu, err := getNode("sfu", "", peer.ID(), msg.RID, msg.MID) if err != nil { - log.Warnf("Not found any sfu node, reject: %d => %s", err.Code, err.Reason) - return nil, util.NewNpError(err.Code, err.Reason) + log.Warnf("Not found any sfu node: %s", err.Error()) + return nil, newError(500, "Not found any sfu node") } - sfu.AsyncRequest(proto.ClientTrickleICE, proto.SfuTrickleMsg{ + err = nrpc.Publish(sfu, proto.SfuTrickleMsg{ MID: msg.MID, Candidate: msg.Candidate, }) + if err != nil { + log.Errorf("Send trickle to sfu error: %s", err.Error()) + return nil, newError(500, "Send trickle to sfu error") + } + return emptyMap, nil } diff --git a/pkg/node/biz/dispatch.go b/pkg/node/biz/dispatch.go index 3eee22f97..2d9e57c72 100644 --- a/pkg/node/biz/dispatch.go +++ b/pkg/node/biz/dispatch.go @@ -5,25 +5,23 @@ import ( "fmt" "net/http" - nprotoo "github.com/cloudwebrtc/nats-protoo" "github.com/dgrijalva/jwt-go" log "github.com/pion/ion-log" "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/signal" - "github.com/pion/ion/pkg/util" ) var ( - errorTokenRequired = util.NewNpError(http.StatusUnauthorized, "Authorization token required for access") - errorInvalidRoomToken = util.NewNpError(http.StatusUnauthorized, "Invalid room token") - errorUnauthorizedRoomAccess = util.NewNpError(http.StatusForbidden, "Permission not sufficient for room") + errorTokenRequired = newError(http.StatusUnauthorized, "Authorization token required for access") + errorInvalidRoomToken = newError(http.StatusUnauthorized, "Invalid room token") + errorUnauthorizedRoomAccess = newError(http.StatusForbidden, "Permission not sufficient for room") ) // ParseProtoo Unmarshals a protoo payload. -func ParseProtoo(msg json.RawMessage, connectionClaims *signal.Claims, msgType interface{}) *nprotoo.Error { +func ParseProtoo(msg json.RawMessage, connectionClaims *signal.Claims, msgType interface{}) *httpError { if err := json.Unmarshal(msg, &msgType); err != nil { log.Errorf("Biz.Entry parse error %v", err.Error()) - return util.NewNpError(http.StatusBadRequest, fmt.Sprintf("Error parsing request object %v", err.Error())) + return newError(http.StatusBadRequest, fmt.Sprintf("Error parsing request object %v", err.Error())) } authenticatable, ok := msgType.(proto.Authenticatable) @@ -37,7 +35,7 @@ func ParseProtoo(msg json.RawMessage, connectionClaims *signal.Claims, msgType i // authenticateRoom checks both the connection token AND an optional message token for RID claims // returns nil for success and returns an error if there are no valid claims for the RID -func authenticateRoom(msgType interface{}, connectionClaims *signal.Claims, authenticatable proto.Authenticatable) *nprotoo.Error { +func authenticateRoom(msgType interface{}, connectionClaims *signal.Claims, authenticatable proto.Authenticatable) *httpError { log.Debugf("authenticateRoom: checking claims on token %v", authenticatable.Token()) // Connection token has valid claim on this room, succeed early if connectionClaims != nil && authenticatable.Room() == proto.RID(connectionClaims.RID) { @@ -75,43 +73,43 @@ func authenticateRoom(msgType interface{}, connectionClaims *signal.Claims, auth // Entry is the biz entry func Entry(method string, peer *signal.Peer, msg json.RawMessage, accept signal.RespondFunc, reject signal.RejectFunc) { var result interface{} - topErr := util.NewNpError(http.StatusBadRequest, fmt.Sprintf("Unkown method [%s]", method)) + err := newError(http.StatusBadRequest, fmt.Sprintf("Unkown method [%s]", method)) switch method { case proto.ClientJoin: var msgData proto.FromClientJoinMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = join(peer, msgData) + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = join(peer, msgData) } case proto.ClientOffer: - var msgData proto.ClientNegotiationMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = offer(peer, msgData) + var msgData proto.ClientOfferMsg + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = offer(peer, msgData) } case proto.ClientAnswer: - var msgData proto.ClientNegotiationMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = answer(peer, msgData) + var msgData proto.ClientAnswerMsg + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = answer(peer, msgData) } case proto.ClientTrickleICE: var msgData proto.ClientTrickleMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = trickle(peer, msgData) + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = trickle(peer, msgData) } case proto.ClientBroadcast: var msgData proto.FromClientBroadcastMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = broadcast(peer, msgData) + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = broadcast(peer, msgData) } case proto.ClientLeave: var msgData proto.FromClientLeaveMsg - if topErr = ParseProtoo(msg, peer.Claims(), &msgData); topErr == nil { - result, topErr = leave(peer, msgData) + if err = ParseProtoo(msg, peer.Claims(), &msgData); err == nil { + result, err = leave(peer, msgData) } } - if topErr != nil { - reject(topErr.Code, topErr.Reason) + if err != nil { + reject(err.Code, err.Reason) } else { accept(result) } diff --git a/pkg/node/biz/error_codes.go b/pkg/node/biz/error.go similarity index 78% rename from pkg/node/biz/error_codes.go rename to pkg/node/biz/error.go index e70c1fb92..6ee34513d 100644 --- a/pkg/node/biz/error_codes.go +++ b/pkg/node/biz/error.go @@ -27,8 +27,21 @@ var codeErr = map[int]string{ codePublishErr: "publish failed", } +type httpError struct { + Code int + Reason string +} + func codeStr(code int) string { return codeErr[code] } var emptyMap = map[string]interface{}{} + +func newError(code int, reason string) *httpError { + err := httpError{ + Code: code, + Reason: reason, + } + return &err +} diff --git a/pkg/node/biz/init.go b/pkg/node/biz/init.go index bb1d6f168..58c5b0663 100644 --- a/pkg/node/biz/init.go +++ b/pkg/node/biz/init.go @@ -1,10 +1,13 @@ package biz import ( - nprotoo "github.com/cloudwebrtc/nats-protoo" + "sync" + + "github.com/nats-io/nats.go" log "github.com/pion/ion-log" conf "github.com/pion/ion/pkg/conf/biz" "github.com/pion/ion/pkg/discovery" + "github.com/pion/ion/pkg/proto" ) var ( @@ -12,59 +15,73 @@ var ( dc = "default" //nolint:unused nid = "biz-unkown-node-id" - protoo *nprotoo.NatsProtoo - rpcs map[string]*nprotoo.Requestor - services map[string]discovery.Node + subs map[string]*nats.Subscription + nodeLock sync.RWMutex + nodes map[string]discovery.Node roomAuth conf.AuthConfig avpElements []string + nrpc *proto.NatsRPC ) -// Init func -func Init(dcID, nodeID, rpcID, eventID string, natsURL string, authConf conf.AuthConfig, elements []string) { +// Init biz +func Init(dcID, nodeID, natsURL string, authConf conf.AuthConfig, elements []string) { dc = dcID nid = nodeID - services = make(map[string]discovery.Node) - rpcs = make(map[string]*nprotoo.Requestor) - protoo = nprotoo.NewNatsProtoo(natsURL) + nodes = make(map[string]discovery.Node) + subs = make(map[string]*nats.Subscription) + nrpc = proto.NewNatsRPC(natsURL) roomAuth = authConf avpElements = elements } -// WatchServiceNodes . -func WatchServiceNodes(service string, state discovery.NodeStateType, node discovery.Node) { +// Close nats rpc +func Close() { + closeSubs() + nrpc.Close() +} + +// WatchIslbNodes watch islb nodes up/down +func WatchIslbNodes(service string, state discovery.NodeStateType, node discovery.Node) { + nodeLock.Lock() + defer nodeLock.Unlock() + id := node.ID if state == discovery.UP { - - if _, found := services[id]; !found { - services[id] = node + if _, found := nodes[id]; !found { + nodes[id] = node } service := node.Info["service"] name := node.Info["name"] - log.Debugf("Service [%s] %s => %s", service, name, id) + log.Debugf("service [%s] %s => %s", service, name, id) - _, found := rpcs[id] + _, found := subs[id] if !found { - rpcID := discovery.GetRPCChannel(node) - eventID := discovery.GetEventChannel(node) - - log.Infof("Create islb requestor: rpcID => [%s]", rpcID) - rpcs[id] = protoo.NewRequestor(rpcID) - - log.Infof("handleIslbBroadCast: eventID => [%s]", eventID) - protoo.OnBroadcast(eventID, handleIslbBroadcast) + islb := node.Info["id"] + log.Infof("subscribe islb: %s", islb) + if sub, err := nrpc.Subscribe(islb+"-event", handleIslbBroadcast); err == nil { + subs[id] = sub + } else { + log.Errorf("subcribe error: %v", err) + } } - } else if state == discovery.DOWN { - delete(rpcs, id) - delete(services, id) + delete(subs, id) + delete(nodes, id) } } -// Close func -func Close() { - if protoo != nil { - protoo.Close() +func getNodes() map[string]discovery.Node { + nodeLock.RLock() + defer nodeLock.RUnlock() + return nodes +} + +func closeSubs() { + nodeLock.Lock() + defer nodeLock.Unlock() + for _, s := range subs { + s.Unsubscribe() } } diff --git a/pkg/node/biz/internal.go b/pkg/node/biz/internal.go index ef514e0ab..22f74bdc3 100644 --- a/pkg/node/biz/internal.go +++ b/pkg/node/biz/internal.go @@ -1,91 +1,79 @@ package biz import ( - "encoding/json" + "errors" - nprotoo "github.com/cloudwebrtc/nats-protoo" log "github.com/pion/ion-log" - "github.com/pion/ion/pkg/discovery" "github.com/pion/ion/pkg/proto" "github.com/pion/ion/pkg/signal" - "github.com/pion/ion/pkg/util" ) -func handleIslbBroadcast(msg nprotoo.Notification, subj string) { - var isblSignalTransformMap = map[string]string{ - proto.IslbStreamAdd: proto.ClientOnStreamAdd, - proto.IslbPeerJoin: proto.ClientOnJoin, - proto.IslbPeerLeave: proto.ClientOnLeave, - proto.IslbBroadcast: proto.ClientBroadcast, - } - go func(msg nprotoo.Notification) { - var data struct { - UID proto.UID `json:"uid"` - RID proto.RID `json:"rid"` - } - if err := msg.Data.Unmarshal(&data); err != nil { - log.Errorf("Error parsing message %v", err) - return +func handleIslbBroadcast(msg interface{}) (interface{}, error) { + go func(msg interface{}) { + log.Infof("handle islb message: %v", msg) + + var method string + var rid proto.RID + var uid proto.UID + + switch v := msg.(type) { + case *proto.FromIslbStreamAddMsg: + method, rid, uid = proto.ClientOnStreamAdd, v.RID, v.UID + case *proto.ToClientPeerJoinMsg: + method, rid, uid = proto.ClientOnJoin, v.RID, v.UID + case *proto.IslbPeerLeaveMsg: + method, rid, uid = proto.ClientOnLeave, v.RID, v.UID + case *proto.IslbBroadcastMsg: + method, rid, uid = proto.ClientBroadcast, v.RID, v.UID + default: + log.Warnf("unkonw message: %v", msg) } - log.Infof("OnIslbBroadcast: method=%s, data=%v", msg.Method, string(msg.Data)) - if newMethod, ok := isblSignalTransformMap[msg.Method]; ok { - if r := signal.GetRoom(data.RID); r != nil { - r.NotifyWithoutID(newMethod, msg.Data, data.UID) - } else { - log.Warnf("room not exits, rid=%s, uid=%, method=%s, msg=%s", data.RID, data.UID, newMethod, msg.Data) - } + log.Infof("broadcast: method=%s, msg=%v", method, msg) + if r := signal.GetRoom(rid); r != nil { + r.NotifyWithoutID(method, msg, uid) + } else { + log.Warnf("room not exits, rid=%s, uid=%", rid, uid) } + }(msg) + + return nil, nil } -func getRPCForIslb() *nprotoo.Requestor { - for _, item := range services { +func getIslb() string { + nodes := getNodes() + for _, item := range nodes { if item.Info["service"] == "islb" { - id := item.Info["id"] - rpc, found := rpcs[id] - if !found { - rpcID := discovery.GetRPCChannel(item) - log.Infof("Create rpc [%s] for islb", rpcID) - rpc = protoo.NewRequestor(rpcID) - rpcs[id] = rpc - } - return rpc + return item.Info["id"] } } log.Warnf("No islb node was found.") - return nil + return "" } -func getRPCForNode(service string, islb *nprotoo.Requestor, uid proto.UID, rid proto.RID, mid proto.MID) (string, *nprotoo.Requestor, *nprotoo.Error) { - if islb == nil { - if islb = getRPCForIslb(); islb == nil { - return "", nil, util.NewNpError(500, "Not found islb.") +func getNode(service string, islb string, uid proto.UID, rid proto.RID, mid proto.MID) (string, error) { + if islb == "" { + if islb = getIslb(); islb == "" { + return "", errors.New("Not found islb") } } - result, err := islb.SyncRequest(proto.IslbFindNode, proto.ToIslbFindNodeMsg{ + resp, err := nrpc.Request(islb, proto.ToIslbFindNodeMsg{ Service: service, UID: uid, RID: rid, MID: mid, }) - if err != nil { - return "", nil, util.NewNpError(500, "Not found "+service) - } - var answer proto.FromIslbFindNodeMsg - if err := json.Unmarshal(result, &answer); err != nil { - return "", nil, &nprotoo.Error{Code: 123, Reason: "Unmarshal error getRPCForNode"} + if err != nil { + return "", err } - log.Infof("IslbFindNode result => %v", answer) - rpcID := answer.RPCID - rpc, found := rpcs[rpcID] - if !found { - rpc = protoo.NewRequestor(rpcID) - rpcs[rpcID] = rpc + msg, ok := resp.(*proto.FromIslbFindNodeMsg) + if !ok { + return "", errors.New("parse islb-find-node msg error") } - return answer.ID, rpc, nil + return msg.ID, nil } diff --git a/pkg/node/islb/init.go b/pkg/node/islb/init.go index 62ca54485..505a1bf3f 100644 --- a/pkg/node/islb/init.go +++ b/pkg/node/islb/init.go @@ -3,9 +3,10 @@ package islb import ( "time" - nprotoo "github.com/cloudwebrtc/nats-protoo" + log "github.com/pion/ion-log" "github.com/pion/ion/pkg/db" "github.com/pion/ion/pkg/discovery" + "github.com/pion/ion/pkg/proto" ) const ( @@ -15,20 +16,40 @@ const ( var ( dc = "default" //nolint:unused - nid = "islb-unkown-node-id" - protoo *nprotoo.NatsProtoo - redis *db.Redis - services map[string]discovery.Node - broadcaster *nprotoo.Broadcaster - ) + nid = "islb-unkown-node-id" + bid string + nrpc *proto.NatsRPC + redis *db.Redis + services map[string]discovery.Node +) // Init func -func Init(dcID, nodeID, rpcID, eventID string, redisCfg db.Config, etcd []string, natsURL string) { +func Init(dcID, nodeID string, redisCfg db.Config, etcd []string, natsURL string) { dc = dcID nid = nodeID + bid = nodeID + "-event" redis = db.NewRedis(redisCfg) - protoo = nprotoo.NewNatsProtoo(natsURL) - broadcaster = protoo.NewBroadcaster(eventID) + nrpc = proto.NewNatsRPC(natsURL) services = make(map[string]discovery.Node) - handleRequest(rpcID) + handleRequest(nid) +} + +// WatchServiceNodes . +func WatchServiceNodes(service string, state discovery.NodeStateType, node discovery.Node) { + id := node.ID + if state == discovery.UP { + if _, found := services[id]; !found { + services[id] = node + service := node.Info["service"] + name := node.Info["name"] + log.Debugf("Service [%s] UP %s => %s", service, name, id) + } + } else if state == discovery.DOWN { + if _, found := services[id]; found { + service := node.Info["service"] + name := node.Info["name"] + log.Debugf("Service [%s] DOWN %s => %s", service, name, id) + delete(services, id) + } + } } diff --git a/pkg/node/islb/internal.go b/pkg/node/islb/internal.go index 229487fa4..21560e3f0 100644 --- a/pkg/node/islb/internal.go +++ b/pkg/node/islb/internal.go @@ -2,39 +2,46 @@ package islb import ( "encoding/json" + "errors" "fmt" "hash/adler32" "math" - nprotoo "github.com/cloudwebrtc/nats-protoo" log "github.com/pion/ion-log" - "github.com/pion/ion/pkg/discovery" "github.com/pion/ion/pkg/proto" - "github.com/pion/ion/pkg/util" ) -// WatchServiceNodes . -func WatchServiceNodes(service string, state discovery.NodeStateType, node discovery.Node) { - id := node.ID - if state == discovery.UP { - if _, found := services[id]; !found { - services[id] = node - service := node.Info["service"] - name := node.Info["name"] - log.Debugf("Service [%s] UP %s => %s", service, name, id) - } - } else if state == discovery.DOWN { - if _, found := services[id]; found { - service := node.Info["service"] - name := node.Info["name"] - log.Debugf("Service [%s] DOWN %s => %s", service, name, id) - delete(services, id) +func handleRequest(rpcID string) { + log.Infof("handleRequest: rpcID => [%s]", rpcID) + + _, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + log.Infof("handleRequest: %T, %+v", msg, msg) + + switch v := msg.(type) { + case *proto.ToIslbFindNodeMsg: + return findNode(v) + case *proto.ToIslbPeerJoinMsg: + return peerJoin(v) + case *proto.IslbPeerLeaveMsg: + return peerLeave(v) + case *proto.ToIslbStreamAddMsg: + return streamAdd(v) + case *proto.IslbBroadcastMsg: + return broadcast(v) + case *proto.ToIslbListMids: + return listMids(v) + default: + return nil, errors.New("unkonw message") } + }) + + if err != nil { + log.Errorf("nrpc subscribe error: %v", err) } } // Find service nodes by name, such as sfu|avp|sip-gateway|rtmp-gateway -func findNode(data proto.ToIslbFindNodeMsg) (interface{}, *nprotoo.Error) { +func findNode(data *proto.ToIslbFindNodeMsg) (interface{}, error) { service := data.Service if data.RID != "" && data.UID != "" && data.MID != "" { @@ -44,76 +51,62 @@ func findNode(data proto.ToIslbFindNodeMsg) (interface{}, *nprotoo.Error) { UID: data.UID, MID: data.MID, }.BuildKey() - log.Infof("Find mids by mkey %s", mkey) + log.Infof("find mids by mkey: %s", mkey) for _, key := range redis.Keys(mkey + "*") { - log.Infof("Got: key => %s", key) + log.Infof("got: key => %s", key) minfo, err := proto.ParseMediaInfo(key) if err != nil { - break + log.Warnf("parse media info error: %v", key) + continue } for _, node := range services { - name := node.Info["name"] id := node.Info["id"] if service == node.Info["service"] && minfo.NID == id { - rpcID := discovery.GetRPCChannel(node) - eventID := discovery.GetEventChannel(node) - resp := proto.FromIslbFindNodeMsg{Name: name, RPCID: rpcID, EventID: eventID, Service: service, ID: id} - log.Infof("findServiceNode: by node ID %s, [%s] %s => %s", minfo.NID, service, name, rpcID) - return resp, nil + log.Infof("found node by rid=% & uid=%s & mid=%s : %v", data.RID, data.UID, data.MID, node) + return proto.FromIslbFindNodeMsg{ID: id}, nil } } } } // MID/RID Doesn't exist in Redis - // Find least packed SFU to return + // Find least packed node to return nid := "" minStreamCount := math.MaxInt32 for _, node := range services { if service == node.Info["service"] { // get stream count - sfuKey := proto.MediaInfo{ + nkey := proto.MediaInfo{ DC: dc, NID: node.Info["id"], }.BuildKey() - streamCount := len(redis.Keys(sfuKey)) + streamCount := len(redis.Keys(nkey)) - log.Infof("findServiceNode looking up sfu stream count [%s] = %v", sfuKey, streamCount) + log.Infof("looking up node stream count: [%s] = %v", nkey, streamCount) if streamCount <= minStreamCount { nid = node.ID minStreamCount = streamCount } } } - log.Infof("findServiceNode: selecting SFU [%s] = %v", nid, minStreamCount) + log.Infof("selecting node: [%s] = %v", nid, minStreamCount) if node, ok := services[nid]; ok { - log.Infof("findServiceNode: found best candidate SFU [%s]", node) - rpcID := discovery.GetRPCChannel(node) - eventID := discovery.GetEventChannel(node) - name := node.Info["name"] - id := node.Info["id"] - resp := proto.FromIslbFindNodeMsg{Name: name, RPCID: rpcID, EventID: eventID, Service: service, ID: id} - log.Infof("findServiceNode: [%s] %s => %s", service, name, rpcID) - return resp, nil + log.Infof("found best node: %v", node) + return proto.FromIslbFindNodeMsg{ID: node.Info["id"]}, nil } // TODO: Add a load balancing algorithm. for _, node := range services { if service == node.Info["service"] { - rpcID := discovery.GetRPCChannel(node) - eventID := discovery.GetEventChannel(node) - name := node.Info["name"] - id := node.Info["id"] - resp := proto.FromIslbFindNodeMsg{Name: name, RPCID: rpcID, EventID: eventID, Service: service, ID: id} - log.Infof("findServiceNode: [%s] %s => %s", service, name, rpcID) - return resp, nil + log.Infof("found node: %v", node) + return proto.FromIslbFindNodeMsg{ID: node.Info["id"]}, nil } } - return nil, util.NewNpError(404, fmt.Sprintf("Service node [%s] not found", service)) + return nil, errors.New("service node not found") } -func streamAdd(data proto.ToIslbStreamAddMsg) (interface{}, *nprotoo.Error) { +func streamAdd(data *proto.ToIslbStreamAddMsg) (interface{}, error) { mkey := proto.MediaInfo{ DC: dc, RID: data.RID, @@ -136,22 +129,23 @@ func streamAdd(data proto.ToIslbStreamAddMsg) (interface{}, *nprotoo.Error) { field = "track/" + string(data.StreamID) // The value here actually doesn't matter, so just store the associated MID in case it's useful in the future. - log.Infof("SetTrackField: mkey, field, value = %s, %s, %s", mkey, field, data.MID) + log.Infof("stores track: mkey, field, value = %s, %s, %s", mkey, field, data.MID) err = redis.HSetTTL(mkey, field, string(data.MID), redisLongKeyTTL) if err != nil { log.Errorf("redis.HSetTTL err = %v", err) } - log.Infof("Broadcast: [stream-add] => %v", data) - broadcaster.Say(proto.IslbStreamAdd, proto.FromIslbStreamAddMsg{ + log.Infof("broadcast: [stream-add] => %v", data) + err = nrpc.Publish(bid, proto.FromIslbStreamAddMsg{ RID: data.RID, UID: data.UID, Stream: proto.Stream{UID: data.UID, StreamID: data.StreamID}, }) - return struct{}{}, nil + + return nil, err } -func listMids(data proto.ToIslbListMids) (interface{}, *nprotoo.Error) { +func listMids(data *proto.ToIslbListMids) (interface{}, error) { mkey := proto.MediaInfo{ DC: dc, RID: data.RID, @@ -170,7 +164,7 @@ func listMids(data proto.ToIslbListMids) (interface{}, *nprotoo.Error) { return proto.FromIslbListMids{MIDs: mids}, nil } -func peerJoin(msg proto.ToIslbPeerJoinMsg) (interface{}, *nprotoo.Error) { +func peerJoin(msg *proto.ToIslbPeerJoinMsg) (interface{}, error) { ukey := proto.UserInfo{ DC: dc, RID: msg.RID, @@ -179,9 +173,12 @@ func peerJoin(msg proto.ToIslbPeerJoinMsg) (interface{}, *nprotoo.Error) { log.Infof("clientJoin: set %s => %v", ukey, string(msg.Info)) // Tell everyone about the new peer. - broadcaster.Say(proto.IslbPeerJoin, proto.ToClientPeerJoinMsg{ + if err := nrpc.Publish(bid, proto.ToClientPeerJoinMsg{ UID: msg.UID, RID: msg.RID, Info: msg.Info, - }) + }); err != nil { + log.Errorf("broadcast peer-join error: %v", err) + return nil, err + } // Tell the new peer about everyone currently in the room. searchKey := proto.UserInfo{ @@ -267,7 +264,7 @@ func peerJoin(msg proto.ToIslbPeerJoinMsg) (interface{}, *nprotoo.Error) { }, nil } -func peerLeave(data proto.IslbPeerLeaveMsg) (interface{}, *nprotoo.Error) { +func peerLeave(data *proto.IslbPeerLeaveMsg) (interface{}, error) { ukey := proto.UserInfo{ DC: dc, RID: data.RID, @@ -278,102 +275,19 @@ func peerLeave(data proto.IslbPeerLeaveMsg) (interface{}, *nprotoo.Error) { if err != nil { log.Errorf("redis.Del err = %v", err) } - broadcaster.Say(proto.IslbPeerLeave, proto.IslbPeerLeaveMsg(data)) - return struct{}{}, nil -} -// func relay(data map[string]interface{}) (interface{}, *nprotoo.Error) { -// rid := util.Val(data, "rid") -// mid := util.Val(data, "mid") -// from := util.Val(data, "from") - -// key := proto.GetPubNodePath(rid, mid) -// info := redis.HGetAll(key) -// for ip := range info { -// method := util.Map("method", proto.IslbRelay, "sid", from, "mid", mid) -// log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) -// //amqp.RpcCall(ip, method, "") -// } -// return struct{}{}, nil -// } - -// func unRelay(data map[string]interface{}) (interface{}, *nprotoo.Error) { -// rid := util.Val(data, "rid") -// mid := util.Val(data, "mid") -// from := util.Val(data, "from") - -// key := proto.GetPubNodePath(rid, mid) -// info := redis.HGetAll(key) -// for ip := range info { -// method := util.Map("method", proto.IslbUnrelay, "mid", mid, "sid", from) -// log.Infof("amqp.RpcCall ip=%s, method=%v", ip, method) -// //amqp.RpcCall(ip, method, "") -// } -// // time.Sleep(time.Millisecond * 10) -// resp := util.Map("mid", mid, "sid", from) -// log.Infof("unRelay: resp=%v", resp) -// return resp, nil -// } - -func broadcast(data proto.IslbBroadcastMsg) (interface{}, *nprotoo.Error) { - broadcaster.Say(proto.IslbBroadcast, proto.IslbBroadcastMsg(data)) - return struct{}{}, nil + if err := nrpc.Publish(bid, data); err != nil { + log.Errorf("broadcast peer-leave error: %v", err) + return nil, err + } + + return nil, nil } -func handleRequest(rpcID string) { - log.Infof("handleRequest: rpcID => [%v]", rpcID) - - protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - go func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - method := request.Method - msg := request.Data - log.Infof("handleRequest: method => %s, data => %s", method, msg) - - var result interface{} - err := util.NewNpError(400, fmt.Sprintf("Unkown method [%s]", method)) - - switch method { - case proto.IslbFindNode: - var msgData proto.ToIslbFindNodeMsg - if err = msg.Unmarshal(&msgData); err == nil { - result, err = findNode(msgData) - } - case proto.IslbPeerJoin: - var msgData proto.ToIslbPeerJoinMsg - if err = msg.Unmarshal(&msgData); err == nil { - result, err = peerJoin(msgData) - } - case proto.IslbPeerLeave: - var msgData proto.IslbPeerLeaveMsg - if err = msg.Unmarshal(&msgData); err == nil { - result, err = peerLeave(msgData) - } - case proto.IslbStreamAdd: - var msgData proto.ToIslbStreamAddMsg - if err = msg.Unmarshal(&msgData); err == nil { - result, err = streamAdd(msgData) - } - // case proto.IslbRelay: - // result, err = relay(data) - // case proto.IslbUnrelay: - // result, err = unRelay(data) - case proto.IslbBroadcast: - var msgData proto.IslbBroadcastMsg - if err = msg.Unmarshal(&msgData); err == nil { - result, err = broadcast(msgData) - } - case proto.IslbListMids: - var msgData proto.ToIslbListMids - if err = msg.Unmarshal(&msgData); err == nil { - result, err = listMids(msgData) - } - } +func broadcast(data *proto.IslbBroadcastMsg) (interface{}, error) { + if err := nrpc.Publish(bid, data); err != nil { + log.Errorf("broadcast message error: %v", err) + } - if err != nil { - reject(err.Code, err.Reason) - } else { - accept(result) - } - }(request, accept, reject) - }) + return nil, nil } diff --git a/pkg/node/sfu/init.go b/pkg/node/sfu/init.go index b937e0638..59c972e47 100644 --- a/pkg/node/sfu/init.go +++ b/pkg/node/sfu/init.go @@ -1,21 +1,21 @@ package sfu import ( - nprotoo "github.com/cloudwebrtc/nats-protoo" + "github.com/pion/ion/pkg/proto" ) var ( //nolint:unused dc = "default" //nolint:unused - nid = "sfu-unkown-node-id" - protoo *nprotoo.NatsProtoo + nid = "sfu-unkown-node-id" + nrpc *proto.NatsRPC ) // Init func -func Init(dcID, nodeID, rpcID, eventID, natsURL string) { +func Init(dcID, nodeID, natsURL string) { dc = dcID nid = nodeID - protoo = nprotoo.NewNatsProtoo(natsURL) - handleRequest(rpcID) + nrpc = proto.NewNatsRPC(natsURL) + handleRequest(nodeID) } diff --git a/pkg/node/sfu/internal.go b/pkg/node/sfu/internal.go index 48eb5e7ab..828aa204a 100644 --- a/pkg/node/sfu/internal.go +++ b/pkg/node/sfu/internal.go @@ -1,13 +1,11 @@ package sfu import ( - "fmt" + "errors" - nprotoo "github.com/cloudwebrtc/nats-protoo" log "github.com/pion/ion-log" isfu "github.com/pion/ion-sfu/pkg" "github.com/pion/ion/pkg/proto" - "github.com/pion/ion/pkg/util" "github.com/pion/webrtc/v3" ) @@ -19,147 +17,137 @@ func InitSFU(config *isfu.Config) { } func handleRequest(rpcID string) { - log.Infof("handleRequest: rpcID => [%v]", rpcID) - protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) { - method := request.Method - data := request.Data - log.Infof("handleRequest: method => %s, data => %s", method, data) - - var result interface{} - err := util.NewNpError(400, fmt.Sprintf("Unknown method [%s]", method)) - - switch method { - case proto.SfuClientJoin: - var msgData proto.ToSfuJoinMsg - if err = data.Unmarshal(&msgData); err == nil { - result, err = join(msgData) - } - case proto.SfuClientOffer: - var msgData proto.SfuNegotiationMsg - if err = data.Unmarshal(&msgData); err == nil { - result, err = offer(msgData) - } - case proto.SfuClientAnswer: - var msgData proto.SfuNegotiationMsg - if err = data.Unmarshal(&msgData); err == nil { - result, err = answer(msgData) - } - case proto.SfuClientTrickle: - var msgData proto.SfuTrickleMsg - if err = data.Unmarshal(&msgData); err == nil { - result, err = trickle(msgData) - } - case proto.SfuClientLeave: - var msgData proto.ToSfuLeaveMsg - if err = data.Unmarshal(&msgData); err == nil { - result, err = leave(msgData) - } - } - - if err != nil { - reject(err.Code, err.Reason) - } else { - accept(result) + log.Infof("handleRequest: rpcID => [%s]", rpcID) + + _, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) { + log.Infof("handleRequest: %T, %+v", msg, msg) + + switch v := msg.(type) { + case *proto.ToSfuJoinMsg: + return join(v) + case *proto.SfuOfferMsg: + return offer(v) + case *proto.SfuAnswerMsg: + return answer(v) + case *proto.SfuTrickleMsg: + return trickle(v) + case *proto.ToSfuLeaveMsg: + return leave(v) + default: + return nil, errors.New("unkonw message") } }) + + if err != nil { + log.Errorf("nrpc subscribe error: %v", err) + } } -func join(msg proto.ToSfuJoinMsg) (interface{}, *nprotoo.Error) { +func join(msg *proto.ToSfuJoinMsg) (interface{}, error) { log.Infof("join msg=%v", msg) - if msg.Jsep.SDP == "" { - return nil, util.NewNpError(415, "publish: jsep invaild.") - } peer := s.addPeer(msg.MID) answer, err := peer.Join(string(msg.SID), msg.Jsep) if err != nil { log.Errorf("join error: %v", err) - return nil, util.NewNpError(415, "join error") + return nil, err } peer.OnOffer = func(offer *webrtc.SessionDescription) { - log.Infof("OnOffer: %v", offer) - protoo.NewRequestor(msg.RPCID).AsyncRequest(proto.SfuClientOffer, proto.SfuNegotiationMsg{ + data := proto.SfuOfferMsg{ MID: msg.MID, RTCInfo: proto.RTCInfo{Jsep: *offer}, - }) + } + log.Infof("send offer to [%s]: %v", msg.RPCID, data) + if err := nrpc.Publish(msg.RPCID, data); err != nil { + log.Errorf("send offer: %v", err) + } } peer.OnIceCandidate = func(candidate *webrtc.ICECandidateInit) { - log.Infof("OnIceCandidate: %v", candidate) - protoo.NewRequestor(msg.RPCID).AsyncRequest(proto.SfuTrickleICE, proto.SfuTrickleMsg{ + data := proto.SfuTrickleMsg{ MID: msg.MID, Candidate: *candidate, - }) + } + log.Infof("send candidate to [%s]: %v", msg.RPCID, data) + if err := nrpc.Publish(msg.RPCID, data); err != nil { + log.Errorf("send candidate to [%s] error: %v", msg.RPCID, err) + } } resp := proto.FromSfuJoinMsg{RTCInfo: proto.RTCInfo{Jsep: *answer}} + + log.Infof("reply join: %v", resp) + return resp, nil } -func offer(msg proto.SfuNegotiationMsg) (interface{}, *nprotoo.Error) { +func offer(msg *proto.SfuOfferMsg) (interface{}, error) { log.Infof("offer msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) - return nil, util.NewNpError(415, "peer not found") + return nil, errors.New("peer not found") } answer, err := peer.Answer(msg.Jsep) if err != nil { log.Errorf("peer.Answer: %v", err) - return nil, util.NewNpError(415, "peer.Answer error") + return nil, errors.New("peer.Answer error") } - resp := proto.SfuNegotiationMsg{ + resp := proto.SfuAnswerMsg{ MID: msg.MID, RTCInfo: proto.RTCInfo{Jsep: *answer}, } + + log.Infof("reply answer: %v", resp) + return resp, nil } -func leave(msg proto.ToSfuLeaveMsg) (interface{}, *nprotoo.Error) { +func leave(msg *proto.ToSfuLeaveMsg) (interface{}, error) { log.Infof("leave msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) - return nil, util.NewNpError(415, "peer not found") + return nil, errors.New("peer not found") } s.delPeer(msg.MID) if err := peer.Close(); err != nil { - return nil, util.NewNpError(415, "failed to close peer") + return nil, errors.New("failed to close peer") } return nil, nil } -func answer(msg proto.SfuNegotiationMsg) (interface{}, *nprotoo.Error) { +func answer(msg *proto.SfuAnswerMsg) (interface{}, error) { log.Infof("answer msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) - return nil, util.NewNpError(415, "peer not found") + return nil, errors.New("peer not found") } if err := peer.SetRemoteDescription(msg.Jsep); err != nil { log.Errorf("set remote description error: %v", err) - return nil, util.NewNpError(415, "set remote description error") + return nil, errors.New("set remote description error") } return nil, nil } -func trickle(msg proto.SfuTrickleMsg) (map[string]interface{}, *nprotoo.Error) { +func trickle(msg *proto.SfuTrickleMsg) (map[string]interface{}, error) { log.Infof("trickle msg=%v", msg) peer := s.getPeer(msg.MID) if peer == nil { log.Warnf("peer not found, mid=%s", msg.MID) - return nil, util.NewNpError(415, "peer not found") + return nil, errors.New("peer not found") } if err := peer.Trickle(msg.Candidate); err != nil { - return nil, util.NewNpError(415, "error adding ice candidate") + return nil, errors.New("error adding ice candidate") } return nil, nil diff --git a/pkg/proto/biz.go b/pkg/proto/biz.go index b30d1d4c1..9dc484fb7 100644 --- a/pkg/proto/biz.go +++ b/pkg/proto/biz.go @@ -1,11 +1,44 @@ package proto import ( + "encoding/gob" "encoding/json" "github.com/pion/webrtc/v3" ) +func init() { + gob.Register(&FromClientJoinMsg{}) + gob.Register(&ToClientJoinMsg{}) + gob.Register(&ToClientPeerJoinMsg{}) + gob.Register(&ClientOfferMsg{}) + gob.Register(&ClientAnswerMsg{}) + gob.Register(&ClientTrickleMsg{}) + gob.Register(&FromClientLeaveMsg{}) + gob.Register(&FromClientBroadcastMsg{}) + gob.Register(&ToClientBroadcastMsg{}) + + gob.Register(&ToSfuJoinMsg{}) + gob.Register(&FromSfuJoinMsg{}) + gob.Register(&ToSfuLeaveMsg{}) + gob.Register(&SfuTrickleMsg{}) + gob.Register(&SfuOfferMsg{}) + gob.Register(&SfuAnswerMsg{}) + + gob.Register(&ToAvpProcessMsg{}) + + gob.Register(&IslbBroadcastMsg{}) + gob.Register(&ToIslbPeerJoinMsg{}) + gob.Register(&FromIslbPeerJoinMsg{}) + gob.Register(&IslbPeerLeaveMsg{}) + gob.Register(&ToIslbStreamAddMsg{}) + gob.Register(&FromIslbStreamAddMsg{}) + gob.Register(&ToIslbFindNodeMsg{}) + gob.Register(&FromIslbFindNodeMsg{}) + gob.Register(&ToIslbListMids{}) + gob.Register(&FromIslbListMids{}) +} + type Authenticatable interface { Room() RID Token() string @@ -33,22 +66,6 @@ type RTCInfo struct { Jsep webrtc.SessionDescription `json:"jsep"` } -type PublishOptions struct { - Codec string `json:"codec"` - Resolution string `json:"resolution"` - Bandwidth int `json:"bandwidth"` - Audio bool `json:"audio"` - Video bool `json:"video"` - Screen bool `json:"screen"` - TransportCC bool `json:"transportCC,omitempty"` - Description string `json:"description,omitempty"` -} - -type SubscribeOptions struct { - Bandwidth int `json:"bandwidth"` - TransportCC bool `json:"transportCC"` -} - type TrackMap map[string][]TrackInfo // Client <-> Biz messages. @@ -80,7 +97,13 @@ type ToClientPeerJoinMsg struct { Info json.RawMessage `json:"info"` } -type ClientNegotiationMsg struct { +type ClientOfferMsg struct { + RID RID `json:"rid"` + MID MID `json:"mid"` + RTCInfo +} + +type ClientAnswerMsg struct { RID RID `json:"rid"` MID MID `json:"mid"` RTCInfo @@ -130,7 +153,12 @@ type SfuTrickleMsg struct { Candidate webrtc.ICECandidateInit `json:"candidate"` } -type SfuNegotiationMsg struct { +type SfuOfferMsg struct { + MID MID `json:"mid"` + RTCInfo +} + +type SfuAnswerMsg struct { MID MID `json:"mid"` RTCInfo } @@ -194,11 +222,7 @@ type ToIslbFindNodeMsg struct { } type FromIslbFindNodeMsg struct { - RPCID string - EventID string - ID string - Name string - Service string + ID string } type ToIslbListMids struct { @@ -210,10 +234,21 @@ type FromIslbListMids struct { MIDs []MID `json:"mids"` } -type GetSFURPCParams struct { - RPCID string - EventID string - ID string - Name string - Service string +// CandidateForJSON for json.Marshal() => browser +func CandidateForJSON(c webrtc.ICECandidateInit) webrtc.ICECandidateInit { + if c.SDPMid == nil { + c.SDPMid = refString("0") + } + if c.SDPMLineIndex == nil { + c.SDPMLineIndex = refUint16(0) + } + return c +} + +func refString(s string) *string { + return &s +} + +func refUint16(i uint16) *uint16 { + return &i } diff --git a/pkg/proto/nats.go b/pkg/proto/nats.go new file mode 100644 index 000000000..0a97aeea3 --- /dev/null +++ b/pkg/proto/nats.go @@ -0,0 +1,175 @@ +package proto + +import ( + "bytes" + "encoding/gob" + "errors" + "time" + + log "github.com/pion/ion-log" + + "github.com/nats-io/nats.go" +) + +// rpcmsg is a structure used by Subscribe and Publish. +type rpcmsg struct { + Data interface{} +} + +// RPCError represents a error string for rpc +type RPCError struct { + Err string +} + +// newError create a RPCError instanse +func newError(err string) *RPCError { + return &RPCError{err} +} + +// MsgHandler is a callback function that processes messages delivered to +// asynchronous subscribers. +type MsgHandler func(msg interface{}) (interface{}, error) + +// NatsRPC represents a rpc base nats +type NatsRPC struct { + nc *nats.Conn +} + +// NewNatsRPC create a instanse and connect to nats server. +func NewNatsRPC(urls string) *NatsRPC { + r := &NatsRPC{} + r.Connect(urls) + return r +} + +// Connect to nats server. +func (r *NatsRPC) Connect(url string) { + // connect options + opts := []nats.Option{nats.Name("nats ion service")} + opts = r.setupConnOptions(opts) + + // connect to nats server + var err error + if r.nc, err = nats.Connect(url, opts...); err != nil { + log.Errorf("connect nats error: %v", err) + } +} + +// Close the connection to the server. +func (r *NatsRPC) Close() { + r.nc.Close() +} + +func (r *NatsRPC) setupConnOptions(opts []nats.Option) []nats.Option { + totalWait := 10 * time.Minute + reconnectDelay := time.Second + + opts = append(opts, nats.ReconnectWait(reconnectDelay)) + opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay))) + opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + if !nc.IsClosed() { + log.Infof("Disconnected due to: %s, will attempt reconnects for %.0fm", err, totalWait.Minutes()) + } + })) + opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { + log.Infof("Reconnected [%s]", nc.ConnectedUrl()) + })) + opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) { + if !nc.IsClosed() { + log.Errorf("Exiting: no servers available") + } else { + log.Errorf("Exiting") + } + })) + return opts +} + +// Subscribe will express interest in the given subject. +// Messages will be delivered to the associated MsgHandler. +func (r *NatsRPC) Subscribe(subj string, handle MsgHandler) (*nats.Subscription, error) { + return r.nc.Subscribe(subj, func(msg *nats.Msg) { + var got rpcmsg + if err := Unmarshal(msg.Data, &got); err != nil { + log.Errorf("decode msg error: %v", err) + } + + result, err := handle(got.Data) + if err != nil { + result = newError(err.Error()) + } + + resp := &rpcmsg{Data: result} + + if msg.Reply != "" { + data, err := Marshal(resp) + if err != nil { + log.Errorf("marshal error: %v", err) + } + if err := msg.Respond(data); err != nil { + log.Errorf("respond error: %v", err) + } + } + }) +} + +// Request will send a request payload and deliver the response message, +// or an error, including a timeout if no message was received properly. +func (r *NatsRPC) Request(subj string, data interface{}) (interface{}, error) { + d, err := Marshal(&rpcmsg{Data: data}) + if err != nil { + return nil, err + } + + resp, err := r.nc.Request(subj, d, nats.DefaultTimeout) + if err != nil { + return nil, err + } + + var result rpcmsg + err = Unmarshal(resp.Data, &result) + if err != nil { + return nil, err + } + + if v, ok := result.Data.(*RPCError); ok { + err = errors.New(v.Err) + result.Data = nil + } + + return result.Data, err +} + +// Publish publishes the data argument to the given subject. The data +// argument is left untouched and needs to be correctly interpreted on +// the receiver. +func (r *NatsRPC) Publish(subj string, data interface{}) error { + d, err := Marshal(&rpcmsg{ + Data: data, + }) + if err != nil { + return err + } + return r.nc.Publish(subj, d) +} + +func init() { + gob.Register(&RPCError{}) +} + +// Unmarshal parses the encoded data and stores the result +// in the value pointed to by v +func Unmarshal(data []byte, v interface{}) error { + dec := gob.NewDecoder(bytes.NewBuffer(data)) + return dec.Decode(v) +} + +// Marshal encodes v and returns encoded data +func Marshal(v interface{}) ([]byte, error) { + buf := new(bytes.Buffer) + enc := gob.NewEncoder(buf) + err := enc.Encode(v) + if err != nil { + return []byte{}, err + } + return buf.Bytes(), nil +} diff --git a/pkg/proto/nats_test.go b/pkg/proto/nats_test.go new file mode 100644 index 000000000..be29eee77 --- /dev/null +++ b/pkg/proto/nats_test.go @@ -0,0 +1,116 @@ +package proto + +import ( + "encoding/gob" + "errors" + "fmt" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +func TestRequestString(t *testing.T) { + n := NewNatsRPC(nats.DefaultURL) + defer n.Close() + + sub, err := n.Subscribe("rpc-id", func(msg interface{}) (interface{}, error) { + fmt.Printf("request: %+v\n", msg) + assert.Equal(t, msg, "tommy") + return "joined", nil + }) + assert.NoError(t, err) + + resp, err := n.Request("rpc-id", "tommy") + fmt.Printf("request: resp=%+v, err=%+v\n", resp, err) + assert.NoError(t, err) + assert.Equal(t, resp, "joined") + + err = sub.Unsubscribe() + assert.NoError(t, err) +} + +func TestRequestStruct(t *testing.T) { + n := NewNatsRPC(nats.DefaultURL) + defer n.Close() + + type JoinReq struct { + ID int + Name string + } + + type JoinResp struct { + MID string + } + + gob.Register(&JoinReq{}) + gob.Register(&JoinResp{}) + + reqMsg := &JoinReq{ID: 1234, Name: "tommy"} + respMsg := &JoinResp{MID: "mid-1234"} + + sub, err := n.Subscribe("rpc-id", func(msg interface{}) (interface{}, error) { + fmt.Printf("request: %+v\n", msg) + assert.Equal(t, msg, reqMsg) + return respMsg, nil + }) + assert.NoError(t, err) + + resp, err := n.Request("rpc-id", reqMsg) + fmt.Printf("respone: resp=%+v, err=%+v\n", resp, err) + assert.NoError(t, err) + assert.Equal(t, resp, respMsg) + + err = sub.Unsubscribe() + assert.NoError(t, err) +} + +func TestPublish(t *testing.T) { + n := NewNatsRPC(nats.DefaultURL) + defer n.Close() + + done := make(chan bool) + + sub, err := n.Subscribe("rpc-id", func(msg interface{}) (interface{}, error) { + fmt.Printf("request: %+v\n", msg) + assert.Equal(t, msg, "tommy") + done <- true + return nil, nil + }) + assert.NoError(t, err) + + err = sub.AutoUnsubscribe(1) + assert.NoError(t, err) + + err = n.Publish("rpc-id", "tommy") + assert.NoError(t, err) + + select { + case <-done: + return + case <-time.After(time.Second): + t.Error("request timeout") + return + } +} + +func TestRequestError(t *testing.T) { + n := NewNatsRPC(nats.DefaultURL) + defer n.Close() + + sub, err := n.Subscribe("rpc-id", func(msg interface{}) (interface{}, error) { + fmt.Printf("request: %+v\n", msg) + assert.Equal(t, msg, "tommy") + return nil, errors.New("join faild") + }) + assert.NoError(t, err) + + resp, err := n.Request("rpc-id", "tommy") + fmt.Printf("request: resp=%+v, err=%+v\n", resp, err) + assert.Equal(t, resp, nil) + assert.Equal(t, err.Error(), "join faild") + + err = sub.Unsubscribe() + assert.NoError(t, err) +} diff --git a/pkg/signal/handle.go b/pkg/signal/handle.go index 7664d2038..788bcd776 100644 --- a/pkg/signal/handle.go +++ b/pkg/signal/handle.go @@ -100,13 +100,13 @@ func in(transport *transport.WebSocketTransport, request *http.Request) { for _, r := range rooms { // only remove if its the same peer. If newer peer joined before the cleanup, leave it. if r.GetPeer(peer.ID()) == peer { - if code > 1000 { - msgStr, _ := json.Marshal(proto.FromClientLeaveMsg{ - UID: proto.UID(peer.ID()), - RID: r.ID(), - }) - bizCall(proto.ClientLeave, peer, msgStr, accept, reject) - } + //if code > 1000 { + msgStr, _ := json.Marshal(proto.FromClientLeaveMsg{ + UID: proto.UID(peer.ID()), + RID: r.ID(), + }) + bizCall(proto.ClientLeave, peer, msgStr, accept, reject) + //} log.Infof("signal.in handleClose delete peer (%s) from room (%s)", peer.ID(), r.ID()) r.DelPeer(peer.ID()) } diff --git a/pkg/signal/peer.go b/pkg/signal/peer.go index 12a0cc31c..8c02bbcbd 100644 --- a/pkg/signal/peer.go +++ b/pkg/signal/peer.go @@ -19,8 +19,9 @@ func newPeer(id proto.UID, t *transport.WebSocketTransport, claims *Claims) *Pee type Peer struct { sync.Mutex peer.Peer - claims *Claims - closed bool + claims *Claims + closed bool + onCloseFun func() } // ID user/peer id @@ -55,5 +56,13 @@ func (p *Peer) Close() { return } p.closed = true + if p.onCloseFun != nil { + p.onCloseFun() + } p.Peer.Close() } + +// SetCloseFun sets a handler that is called when the peer close +func (p *Peer) SetCloseFun(f func()) { + p.onCloseFun = f +} diff --git a/pkg/signal/room.go b/pkg/signal/room.go index 2ce5a8d4e..8e79c3ac3 100644 --- a/pkg/signal/room.go +++ b/pkg/signal/room.go @@ -124,7 +124,7 @@ func GetPeer(rid proto.RID, uid proto.UID) *Peer { log.Infof("GetPeer rid=%s uid=%s", rid, uid) r := GetRoom(rid) if r == nil { - log.Infof("room not exits, rid=%s uid=%s", rid, uid) + //log.Infof("room not exits, rid=%s uid=%s", rid, uid) return nil } return r.GetPeer(uid) diff --git a/pkg/util/util.go b/pkg/util/util.go index 965a6ba02..22af4346b 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -6,7 +6,6 @@ import ( "runtime/debug" "strings" - nprotoo "github.com/cloudwebrtc/nats-protoo" log "github.com/pion/ion-log" ) @@ -53,10 +52,3 @@ func Recover(flag string) { debug.PrintStack() } } -func NewNpError(code int, reason string) *nprotoo.Error { - err := nprotoo.Error{ - Code: code, - Reason: reason, - } - return &err -}