Skip to content

Commit

Permalink
(fix) clean redis peer leave
Browse files Browse the repository at this point in the history
  • Loading branch information
adwpc committed Nov 19, 2021
1 parent dcf4f1a commit 665453b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 11 deletions.
11 changes: 10 additions & 1 deletion apps/room/server/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type Room struct {
peers map[string]*Peer
info *room.Room
update time.Time
redis *db.Redis
}

type RoomServer struct {
Expand Down Expand Up @@ -220,11 +221,12 @@ func (s *RoomServer) Close() {
}

// newRoom creates a new room instance
func newRoom(sid string) *Room {
func newRoom(sid string, redis *db.Redis) *Room {
r := &Room{
sid: sid,
peers: make(map[string]*Peer),
update: time.Now(),
redis: redis,
}
return r
}
Expand Down Expand Up @@ -299,6 +301,13 @@ func (r *Room) delPeer(p *Peer) int {
Peer: p.info,
State: room.PeerState_LEAVE,
}

key := util.GetRedisPeerKey(p.info.Sid, uid)
err := r.redis.Del(key)
if err != nil {
log.Errorf("err=%v", err)
}

r.broadcastPeerEvent(event)

return peerCount
Expand Down
4 changes: 3 additions & 1 deletion apps/room/server/room_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func (s *RoomService) createRoom(sid string) *Room {
if r := s.rooms[sid]; r != nil {
return r
}
r := newRoom(sid)
r := newRoom(sid, s.redis)
s.rooms[sid] = r
return r
}
Expand Down Expand Up @@ -516,7 +516,9 @@ func (s *RoomService) stat() {
//clean after room is clean and expired
duration := time.Since(room.update)
if duration > roomExpire && room.count() == 0 {
s.roomLock.Lock()
delete(s.rooms, sid)
s.roomLock.Unlock()
}
info += fmt.Sprintf("room: %s\npeers: %d\n", sid, room.count())
}
Expand Down
53 changes: 44 additions & 9 deletions apps/room/server/room_signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (s *RoomSignalService) Signal(stream room.RoomSignal_SignalServer) error {
switch payload := req.Payload.(type) {
case *room.Request_Join:
log.Infof("[C->S]=%+v", payload)
reply, peer, err := s.Join(payload)
reply, peer, err := s.Join(payload, stream)
if err != nil {
log.Errorf("Join err: %v", err)
return err
}
peer.sig = stream

p = peer
log.Infof("[S->C]=%+v", reply)
err = stream.Send(&room.Reply{Payload: reply})
Expand Down Expand Up @@ -95,8 +95,10 @@ func (s *RoomSignalService) Signal(stream room.RoomSignal_SignalServer) error {
}
}

func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer, error) {
func (s *RoomSignalService) Join(in *room.Request_Join, stream room.RoomSignal_SignalServer) (*room.Reply_Join, *Peer, error) {
pinfo := in.Join.Peer
sid := pinfo.Sid
uid := pinfo.Uid

if pinfo == nil || pinfo.Sid == "" && pinfo.Uid == "" {
reply := &room.Reply_Join{
Expand All @@ -111,16 +113,12 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer
}
return reply, nil, status.Errorf(codes.Internal, "sid/uid is empty")
}
key := util.GetRedisRoomKey(pinfo.Sid)
infos := s.rs.redis.HGetAll(key)
sid := infos["sid"]
uid := infos["uid"]

key := util.GetRedisRoomKey(sid)
// create in redis if room not exist
if sid == "" {
// store room info
sid = pinfo.Sid
err := s.rs.redis.HMSetTTL(roomRedisExpire, key, "sid", pinfo.Sid, "name", pinfo.DisplayName,
err := s.rs.redis.HMSetTTL(roomRedisExpire, key, "sid", sid, "name", pinfo.DisplayName,
"password", "", "description", "", "lock", "0")
if err != nil {
reply := &room.Reply_Join{
Expand Down Expand Up @@ -168,6 +166,7 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer

peer = NewPeer()
peer.info = pinfo
peer.sig = stream
r.addPeer(peer)
// TODO
/*
Expand Down Expand Up @@ -205,6 +204,42 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer
},
}

// find peer in room
key = util.GetRedisPeersPrefixKey(sid)
peersKeys := s.rs.redis.Keys(key)

for _, pkey := range peersKeys {
res := s.rs.redis.HGetAll(pkey)
sid = res["sid"]
uid = res["uid"]
if sid == "" || uid == "" || uid == pinfo.Uid {
continue
}
key = util.GetRedisPeerKey(sid, uid)
res = s.rs.redis.HGetAll(key)
if len(res) != 0 {
info := &room.Peer{
Sid: res["sid"],
Uid: res["uid"],
DisplayName: res["name"],
ExtraInfo: []byte(res["info"]),
Role: room.Role(room.Role_value[res["role"]]),
Protocol: room.Protocol(room.Protocol_value[res["protocol"]]),
Avatar: res["avatar"],
Direction: room.Peer_Direction(room.Peer_Direction_value["direction"]),
Vendor: res["vendor"],
}

err := peer.sendPeerEvent(&room.PeerEvent{
State: room.PeerState_JOIN,
Peer: info,
})

if err != nil {
log.Errorf("signal send peer event error: %v", err)
}
}
}
return reply, peer, nil
}

Expand Down

0 comments on commit 665453b

Please sign in to comment.