Skip to content
This repository has been archived by the owner on Dec 2, 2023. It is now read-only.
/ ion Public archive
  • Rate limit · GitHub

    Whoa there!

    You have triggered an abuse detection mechanism.

    Please wait a few minutes before you try again;
    in some cases this may take up to an hour.

  • Notifications You must be signed in to change notification settings
  • Fork 518

Fix lint errors #542

Merged
merged 2 commits into from
Apr 11, 2021
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix lint errors.
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

cloudwebrtc committed Apr 11, 2021
commit acb2c7cbb81741b65a115e1d251540656bda541e
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@ require (
github.com/cloudwebrtc/nats-grpc v0.1.3
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/go-redis/redis/v7 v7.4.0
github.com/golang/protobuf v1.4.3
github.com/improbable-eng/grpc-web v0.13.0
github.com/nats-io/nats-server/v2 v2.1.9
github.com/nats-io/nats.go v1.10.0
@@ -22,6 +21,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tj/assert v0.0.3
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 // indirect
google.golang.org/grpc v1.35.0
google.golang.org/protobuf v1.25.0
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -583,6 +583,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210217090653-ed5674b6da4a h1:m4knbKtdWq+rPB3TE+ApaRzkETZngkKdhYjvTnnRq4s=
golang.org/x/sys v0.0.0-20210217090653-ed5674b6da4a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4 h1:EZ2mChiOa8udjfp6rRmswTbtZN/QzUQp4ptM4rnjHvc=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
20 changes: 16 additions & 4 deletions pkg/ion/node_test.go
Original file line number Diff line number Diff line change
@@ -36,15 +36,18 @@ func init() {
func TestWatch(t *testing.T) {
n := NewNode(nid)

registry.Listen(func(action string, node discovery.Node) {
err := registry.Listen(func(action string, node discovery.Node) {
log.Debugf("handleNode: service %v, action %v => id %v, RPC %v", node.Service, action, node.ID(), node.RPC)
assert.Equal(t, node.NID, nid)
assert.Equal(t, node.Service, proto.ServiceBIZ)
wg.Done()
})
if err != nil {
t.Error(err)
}

wg.Add(1)
err := n.Start(natsURL)
err = n.Start(natsURL)
if err != nil {
t.Error(err)
}
@@ -60,11 +63,20 @@ func TestWatch(t *testing.T) {
},
}

go n.KeepAlive(node)
go func() {
err := n.KeepAlive(node)
if err != nil {
t.Error(err)
}
}()

wg.Wait()
assert.NotEmpty(t, n.ServiceRegistrar())

n.Watch(proto.ServiceBIZ)
err = n.Watch(proto.ServiceBIZ)
if err != nil {
t.Error(err)
}

n.Close()
}
15 changes: 12 additions & 3 deletions pkg/node/avp/avp.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,6 @@ type Config struct {

// AVP represents avp node
type AVP struct {
config Config
ion.Node
s *avpServer
}
@@ -84,7 +83,12 @@ func (a *AVP) Start(conf Config) error {
},
}

go a.Node.KeepAlive(node)
go func() {
err := a.Node.KeepAlive(node)
if err != nil {
log.Errorf("avp.Node.KeepAlive: error => %v", err)
}
}()

elems := make(map[string]iavp.ElementFun)
if conf.Element.Webmsaver.On {
@@ -105,7 +109,12 @@ func (a *AVP) Start(conf Config) error {
pb.RegisterAVPServer(a.Node.ServiceRegistrar(), a.s)

//Watch ISLB nodes.
go a.Node.Watch(proto.ServiceISLB)
go func() {
err := a.Node.Watch(proto.ServiceISLB)
if err != nil {
log.Errorf("avp.Node.Watch: error => %v", err)
}
}()

return nil
}
24 changes: 2 additions & 22 deletions pkg/node/avp/server.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"io"
"sync"

"github.com/cloudwebrtc/nats-discovery/pkg/discovery"
pb "github.com/pion/ion-avp/cmd/signal/grpc/proto"
avp "github.com/pion/ion-avp/pkg"
log "github.com/pion/ion-log"
@@ -62,31 +61,12 @@ func (a *AVPProcesser) Process(ctx context.Context, addr, pid, sid, tid, eid str

type avpServer struct {
pb.UnimplementedAVPServer
avp *AVPProcesser
nodeLock sync.RWMutex
nodes map[string]*discovery.Node
avp *AVPProcesser
}

func newAVPServer(conf avp.Config, elems map[string]avp.ElementFun) *avpServer {
return &avpServer{
avp: NewAVPProcesser(conf, elems),
nodes: make(map[string]*discovery.Node),
}
}

// watchIslbNodes watch islb nodes up/down
func (a *avpServer) watchIslbNodes(state discovery.NodeState, node *discovery.Node) {
a.nodeLock.Lock()
defer a.nodeLock.Unlock()
id := node.NID
if state == discovery.NodeUp {
log.Infof("islb node %v up", id)
if _, found := a.nodes[id]; !found {
a.nodes[id] = node
}
} else if state == discovery.NodeDown {
log.Infof("islb node %v down", id)
delete(a.nodes, id)
avp: NewAVPProcesser(conf, elems),
}
}

15 changes: 12 additions & 3 deletions pkg/node/biz/biz.go
Original file line number Diff line number Diff line change
@@ -96,11 +96,20 @@ func (b *BIZ) Start(conf Config) error {
},
}

go b.Node.KeepAlive(node)
go func() {
err := b.Node.KeepAlive(node)
if err != nil {
log.Errorf("biz.Node.KeepAlive(%v) error %v", b.Node.NID, err)
}
}()

//Watch ISLB nodes.
go b.Node.Watch(proto.ServiceISLB)

go func() {
err := b.Node.Watch(proto.ServiceISLB)
if err != nil {
log.Errorf("biz.Node.Watch(proto.ServiceISLB) error %v", err)
}
}()
return nil
}

25 changes: 21 additions & 4 deletions pkg/node/biz/biz_test.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,12 @@ func init() {
bs = newBizServer(bn, dc, nid, []string{}, nc)

//Watch ISLB nodes.
go bn.Node.Watch(proto.ServiceISLB)
go func() {
err := bn.Node.Watch(proto.ServiceISLB)
if err != nil {
log.Panicf("failed to Watch: %v", err)
}
}()

pb.RegisterBizServer(s, bs)

@@ -90,7 +95,7 @@ func TestJBizJoin(t *testing.T) {
t.Error(err)
}

stream.Send(&pb.SignalRequest{
err = stream.Send(&pb.SignalRequest{
Payload: &pb.SignalRequest_Join{
Join: &pb.Join{
Peer: &ion.Peer{
@@ -101,6 +106,10 @@ func TestJBizJoin(t *testing.T) {
},
})

if err != nil {
t.Error(err)
}

reply, err := stream.Recv()
if err != nil {
t.Error(err)
@@ -118,7 +127,7 @@ func TestJBizJoin(t *testing.T) {
}

func TestBizMessage(t *testing.T) {
stream.Send(&pb.SignalRequest{
err := stream.Send(&pb.SignalRequest{
Payload: &pb.SignalRequest_Msg{
Msg: &ion.Message{
From: uid,
@@ -128,6 +137,10 @@ func TestBizMessage(t *testing.T) {
},
})

if err != nil {
t.Error(err)
}

reply, err := stream.Recv()
if err != nil {
t.Error(err)
@@ -138,14 +151,18 @@ func TestBizMessage(t *testing.T) {
}

func TestBizLeave(t *testing.T) {
stream.Send(&pb.SignalRequest{
err := stream.Send(&pb.SignalRequest{
Payload: &pb.SignalRequest_Leave{
Leave: &pb.Leave{
Uid: uid,
},
},
})

if err != nil {
t.Error(err)
}

reply, err := stream.Recv()
if err != nil {
t.Error(err)
10 changes: 8 additions & 2 deletions pkg/node/biz/room.go
Original file line number Diff line number Diff line change
@@ -54,10 +54,16 @@ func (r *Room) addPeer(p *Peer) {
Info: peer.info,
},
}
p.sendPeerEvent(event)
err := p.sendPeerEvent(event)
if err != nil {
log.Errorf("p.sendPeerEvent() failed %v", err)
}

if peer.lastStreamEvent != nil {
p.sendStreamEvent(peer.lastStreamEvent)
err := p.sendStreamEvent(peer.lastStreamEvent)
if err != nil {
log.Errorf("p.sendStreamEvent() failed %v", err)
}
}
}

26 changes: 21 additions & 5 deletions pkg/node/biz/server.go
Original file line number Diff line number Diff line change
@@ -73,10 +73,13 @@ func (s *BizServer) watchISLBEvent(nid string, sid string) error {
if err != nil {
return err
}
stream.Send(&islb.WatchRequest{
err = stream.Send(&islb.WatchRequest{
Nid: nid,
Sid: sid,
})
if err != nil {
return err
}

go func() {
for {
@@ -149,7 +152,10 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
if !ok {
return io.EOF
}
stream.Send(reply)
err := stream.Send(reply)
if err != nil {
return err
}
case req, ok := <-reqCh:
if !ok {
return io.EOF
@@ -191,7 +197,10 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
reason = fmt.Sprintf("islbcli.FindNode(serivce = sfu, sid = %v) err %v", sid, err)
}

s.watchISLBEvent(nid, sid)
err = s.watchISLBEvent(nid, sid)
if err != nil {
log.Errorf("s.watchISLBEvent(req) failed %v", err)
}
}
if r != nil {
peer = NewPeer(sid, uid, payload.Join.Peer.Info, repCh)
@@ -203,14 +212,18 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
reason = fmt.Sprintf("join [sid=%v] islb node not found", sid)
}

stream.Send(&biz.SignalReply{
err := stream.Send(&biz.SignalReply{
Payload: &biz.SignalReply_JoinReply{
JoinReply: &biz.JoinReply{
Success: success,
Reason: reason,
},
},
})

if err != nil {
log.Errorf("stream.Send(&biz.SignalReply) failed %v", err)
}
case *biz.SignalRequest_Leave:
uid := payload.Leave.Uid
if peer != nil && peer.uid == uid {
@@ -223,13 +236,16 @@ func (s *BizServer) Signal(stream biz.Biz_SignalServer) error {
r = nil
}

stream.Send(&biz.SignalReply{
err := stream.Send(&biz.SignalReply{
Payload: &biz.SignalReply_LeaveReply{
LeaveReply: &biz.LeaveReply{
Reason: "closed",
},
},
})
if err != nil {
log.Errorf("stream.Send(&biz.SignalReply) failed %v", err)
}
}
case *biz.SignalRequest_Msg:
log.Debugf("Message: from: %v => to: %v, data: %v", payload.Msg.From, payload.Msg.To, payload.Msg.Data)
22 changes: 17 additions & 5 deletions pkg/node/biz/sfu.go
Original file line number Diff line number Diff line change
@@ -17,15 +17,18 @@ type SFUSignalBridge struct {

//Signal bridge SFU signaling between client and sfu node.
func (s *SFUSignalBridge) Signal(sstream sfu.SFU_SignalServer) error {
var peer *Peer = nil
var peer *Peer
var cstream sfu.SFU_SignalClient = nil
reqCh := make(chan *sfu.SignalRequest)
repCh := make(chan *sfu.SignalReply)
errCh := make(chan error)

defer func() {
if cstream != nil {
cstream.CloseSend()
err := cstream.CloseSend()
if err != nil {
log.Errorf("cstream.CloseSend() failed %v", err)
}
}
close(errCh)
log.Infof("SFU.Signal loop done")
@@ -54,7 +57,10 @@ func (s *SFUSignalBridge) Signal(sstream sfu.SFU_SignalServer) error {
}

if cstream != nil {
cstream.Send(req)
err := cstream.Send(req)
if err != nil {
log.Errorf("cstream.Send(req) failed %v", err)
}
break
}

@@ -87,7 +93,10 @@ func (s *SFUSignalBridge) Signal(sstream sfu.SFU_SignalServer) error {
}
}()

cstream.Send(req)
err = cstream.Send(req)
if err != nil {
return fmt.Errorf("cstream.Send(req) failed %v", err)
}
break
} else {
return fmt.Errorf("peer [%v] not found", payload.Join.Uid)
@@ -98,7 +107,10 @@ func (s *SFUSignalBridge) Signal(sstream sfu.SFU_SignalServer) error {
}
case reply, ok := <-repCh:
if ok {
sstream.Send(reply)
err := sstream.Send(reply)
if err != nil {
return fmt.Errorf("sstream.Send(reply) failed %v", err)
}
break
}
return io.EOF
Rate limit · GitHub

Whoa there!

You have triggered an abuse detection mechanism.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.