Skip to content

Commit

Permalink
feat(proto): add nats-rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
cgojin committed Oct 30, 2020
1 parent ef15acd commit 4401e76
Show file tree
Hide file tree
Showing 28 changed files with 901 additions and 677 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ cmd/islb/islb
cmd/avp/avp
./ion
.idea
.vscode
node_modules/
cover.out

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
+ **ION** is a secure, self-hosted [WebRTC](https://webrtc.org/) video conferencing [SFU (what is an SFU?)](https://testrtc.com/different-multiparty-video-conferencing/), that you can host today in the cloud or on-premise.
+ This **ION** repository contains the backend cluster services, so you also need to deploy the [web app](https://github.com/pion/ion-app-web) or install a [flutter client for web, desktop or mobile](https://github.com/pion/ion-app-flutter)
+ **ION**'s mission is to deliver world-class tools for creating communication systems, and many people build their projects on top of it:
+ [**ION** Cluster](https://github.com/pion/ion) (This project!) is composed of two services [`biz` + `ISLB` (see glossary)](docs/glossary.md) and uses `NATS`, `etcd` and `redis` as databases to administer room membership, manage text chat, verify JWT authentication and assign clients to the proper SFU in a multi-datacenter architecture. **ION** Cluster also builds its own version of `ion-sfu` binary, which is lightly adapted to use `NATS-protoo` for signaling (which is how `biz` and `ISLB` trade messages internally).
+ [**ION** Cluster](https://github.com/pion/ion) (This project!) is composed of two services [`biz` + `ISLB` (see glossary)](docs/glossary.md) and uses `NATS`, `etcd` and `redis` as databases to administer room membership, manage text chat, verify JWT authentication and assign clients to the proper SFU in a multi-datacenter architecture. **ION** Cluster also builds its own version of `ion-sfu` binary, which is lightly adapted to use `NatsRPC` for signaling (which is how `biz` and `ISLB` trade messages internally).
+ [`ion-sfu` (external)](https://github.com/pion/ion-sfu), which handles WebRTC streams, can be used as a standalone SFU for designing custom chat experiences or implementing your own scaling architecture. `ion-sfu` is equally capable of forwarding Video, Audio and DataChannel tracks, and can handle arbitrary non-media data transport.
+ [`ion-avp` Audio/Video Processing](https://github.com/pion/ion-avp) (WIP) is a sidecar utility for running realtime AV processing jobs, including `write-to-disk`, `ffmpeg` and `openCV`
+ *`ion-live` LIVE node (planned)* - A feed streaming gateway for supporting publishing to and from SIP/RTMP/HLS/RTSP endpoints
Expand Down
5 changes: 1 addition & 4 deletions cmd/avp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func main() {

serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc)
serviceNode.RegisterNode("avp", "node-avp", "avp-channel-id")

rpcID := serviceNode.GetRPCChannel()
eventID := serviceNode.GetEventChannel()
avp.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], rpcID, eventID, conf.Nats.URL)
avp.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL)

select {}
}
7 changes: 2 additions & 5 deletions cmd/biz/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,10 @@ func main() {

serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc)
serviceNode.RegisterNode("biz", "node-biz", "biz-channel-id")

rpcID := serviceNode.GetRPCChannel()
eventID := serviceNode.GetEventChannel()
biz.Init(conf.Global.Dc, serviceNode.NodeInfo().ID, rpcID, eventID, conf.Nats.URL, conf.Signal.AuthRoom, conf.Avp.Elements)
biz.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL, conf.Signal.AuthRoom, conf.Avp.Elements)

serviceWatcher := discovery.NewServiceWatcher(conf.Etcd.Addrs, conf.Global.Dc)
go serviceWatcher.WatchServiceNode("islb", biz.WatchServiceNodes)
go serviceWatcher.WatchServiceNode("islb", biz.WatchIslbNodes)

defer close()
select {}
Expand Down
5 changes: 2 additions & 3 deletions cmd/islb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ func main() {
Pwd: conf.Redis.Pwd,
DB: conf.Redis.DB,
}
rpcID := serviceNode.GetRPCChannel()
eventID := serviceNode.GetEventChannel()
islb.Init(conf.Global.Dc, serviceNode.NodeInfo().ID, rpcID, eventID, redisCfg, conf.Etcd.Addrs, conf.Nats.URL)

islb.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], redisCfg, conf.Etcd.Addrs, conf.Nats.URL)

serviceWatcher := discovery.NewServiceWatcher(conf.Etcd.Addrs, conf.Global.Dc)
go serviceWatcher.WatchServiceNode("", islb.WatchServiceNodes)
Expand Down
4 changes: 1 addition & 3 deletions cmd/sfu/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ func main() {
serviceNode := discovery.NewServiceNode(conf.Etcd.Addrs, conf.Global.Dc)
serviceNode.RegisterNode("sfu", "node-sfu", "sfu-channel-id")

rpcID := serviceNode.GetRPCChannel()
eventID := serviceNode.GetEventChannel()
sfu.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], rpcID, eventID, conf.Nats.URL)
sfu.Init(conf.Global.Dc, serviceNode.NodeInfo().Info["id"], conf.Nats.URL)
select {}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
require (
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906
github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de
github.com/coreos/etcd v3.3.25+incompatible // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-ole/go-ole v1.2.4 // indirect
Expand All @@ -16,6 +15,7 @@ require (
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/uuid v1.1.2
github.com/gorilla/websocket v1.4.2
github.com/nats-io/nats.go v1.10.0
github.com/notedit/sdp v0.0.4
github.com/pion/ion-avp v1.0.39
github.com/pion/ion-log v0.0.0-20201024224650-e6b94dfeaf1d
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9 h1:xz6Nv3zcw
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9/go.mod h1:2wSM9zJkl1UQEFZgSd68NfCgRz1VL1jzy/RjCg+ULrs=
github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906 h1:CXJrfUVNhSAKWnb+oSvd8MBCQJHK6+RS7jLvx2z3ba0=
github.com/cloudwebrtc/go-protoo v0.0.0-20200926140535-79ecde67b906/go.mod h1:Q0DiItmsD5iCBdeID9Xu03ok8bemc78XJ+0rYATQbuQ=
github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de h1:yPvjphU5iEeYTOOPzSwU9TNoy0nOwtv5LYMZMVtOrPY=
github.com/cloudwebrtc/nats-protoo v0.0.0-20200604135451-87b43396e8de/go.mod h1:zwKwTqbrcBl9o2AHopHYlMh4KM3wMQHYeR6TrtoRCQ0=
github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ=
Expand Down Expand Up @@ -347,7 +345,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.14.3/go.mod h1:3WXPzbXEEliJ+a6UFE4vhIxV8qR1EML6ngzP9ug4eYg=
github.com/rs/zerolog v1.17.2/go.mod h1:9nvC1axdVrAHcu/s9taAVfBuIdTZLVQmKQyvrUjF5+I=
github.com/rs/zerolog v1.20.0 h1:38k9hgtUBdxFwE34yS8rTHmHBa4eN16E4DJlv177LNs=
github.com/rs/zerolog v1.20.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
2 changes: 1 addition & 1 deletion pkg/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func decode(ds []byte) map[string]string {
var s map[string]string
err := json.Unmarshal(ds, &s)
if err != nil {
log.Errorf("service.decode err => %+v", err)
log.Errorf("service.decode err => %v", err)
return nil
}
return s
Expand Down
14 changes: 6 additions & 8 deletions pkg/node/avp/init.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package avp

import (
nprotoo "github.com/cloudwebrtc/nats-protoo"
)
import "github.com/pion/ion/pkg/proto"

var (
//nolint:unused
dc = "default"
//nolint:unused
nid = "avp-unkown-node-id"
protoo *nprotoo.NatsProtoo
nid = "avp-unkown-node-id"
nrpc *proto.NatsRPC
)

// Init func
func Init(dcID, nodeID, rpcID, eventID, natsURL string) {
func Init(dcID, nodeID, natsURL string) {
dc = dcID
nid = nodeID
protoo = nprotoo.NewNatsProtoo(natsURL)
handleRequest(rpcID)
nrpc = proto.NewNatsRPC(natsURL)
handleRequest(nid)
}
39 changes: 15 additions & 24 deletions pkg/node/avp/internal.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,31 @@
package avp

import (
"fmt"
"errors"

nprotoo "github.com/cloudwebrtc/nats-protoo"
log "github.com/pion/ion-log"
"github.com/pion/ion/pkg/proto"
"github.com/pion/ion/pkg/util"
)

func handleRequest(rpcID string) {
log.Infof("handleRequest: rpcID => [%v]", rpcID)
protoo.OnRequest(rpcID, func(request nprotoo.Request, accept nprotoo.RespondFunc, reject nprotoo.RejectFunc) {
method := request.Method
data := request.Data
log.Infof("handleRequest: method => %s, data => %s", method, data)
log.Infof("handleRequest: rpcID => [%s]", rpcID)

var result interface{}
errResult := util.NewNpError(400, fmt.Sprintf("Unknown method [%s]", method))
_, err := nrpc.Subscribe(rpcID, func(msg interface{}) (interface{}, error) {
log.Infof("handleRequest: %T, %+v", msg, msg)

switch method {
case proto.AvpProcess:
var msg proto.ToAvpProcessMsg
if errResult = data.Unmarshal(&msg); errResult != nil {
break
switch v := msg.(type) {
case *proto.ToAvpProcessMsg:
if err := s.Process(v.Addr, v.PID, v.SID, v.TID, v.EID, v.Config); err != nil {
return nil, err
}
if err := s.Process(msg.Addr, msg.PID, msg.SID, msg.TID, msg.EID, msg.Config); err != nil {
errResult = util.NewNpError(500, err.Error())
}
errResult = nil
default:
return nil, errors.New("unkonw message")
}

if errResult != nil {
reject(errResult.Code, errResult.Reason)
} else {
accept(result)
}
return nil, nil
})

if err != nil {
log.Errorf("nrpc subscribe error: %v", err)
}
}
Loading

0 comments on commit 4401e76

Please sign in to comment.