From 665453b167387b54fab3259e07ef6ce6992459ee Mon Sep 17 00:00:00 2001 From: adwpc Date: Fri, 19 Nov 2021 16:37:45 +0800 Subject: [PATCH] (fix) clean redis peer leave --- apps/room/server/room.go | 11 ++++++- apps/room/server/room_service.go | 4 ++- apps/room/server/room_signal.go | 53 ++++++++++++++++++++++++++------ 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/apps/room/server/room.go b/apps/room/server/room.go index 9a2328b74..37788cccc 100644 --- a/apps/room/server/room.go +++ b/apps/room/server/room.go @@ -86,6 +86,7 @@ type Room struct { peers map[string]*Peer info *room.Room update time.Time + redis *db.Redis } type RoomServer struct { @@ -220,11 +221,12 @@ func (s *RoomServer) Close() { } // newRoom creates a new room instance -func newRoom(sid string) *Room { +func newRoom(sid string, redis *db.Redis) *Room { r := &Room{ sid: sid, peers: make(map[string]*Peer), update: time.Now(), + redis: redis, } return r } @@ -299,6 +301,13 @@ func (r *Room) delPeer(p *Peer) int { Peer: p.info, State: room.PeerState_LEAVE, } + + key := util.GetRedisPeerKey(p.info.Sid, uid) + err := r.redis.Del(key) + if err != nil { + log.Errorf("err=%v", err) + } + r.broadcastPeerEvent(event) return peerCount diff --git a/apps/room/server/room_service.go b/apps/room/server/room_service.go index 6f5c940e2..e010ef5ee 100644 --- a/apps/room/server/room_service.go +++ b/apps/room/server/room_service.go @@ -478,7 +478,7 @@ func (s *RoomService) createRoom(sid string) *Room { if r := s.rooms[sid]; r != nil { return r } - r := newRoom(sid) + r := newRoom(sid, s.redis) s.rooms[sid] = r return r } @@ -516,7 +516,9 @@ func (s *RoomService) stat() { //clean after room is clean and expired duration := time.Since(room.update) if duration > roomExpire && room.count() == 0 { + s.roomLock.Lock() delete(s.rooms, sid) + s.roomLock.Unlock() } info += fmt.Sprintf("room: %s\npeers: %d\n", sid, room.count()) } diff --git a/apps/room/server/room_signal.go b/apps/room/server/room_signal.go index 3bb40773b..ec71d14d7 100644 --- a/apps/room/server/room_signal.go +++ b/apps/room/server/room_signal.go @@ -53,12 +53,12 @@ func (s *RoomSignalService) Signal(stream room.RoomSignal_SignalServer) error { switch payload := req.Payload.(type) { case *room.Request_Join: log.Infof("[C->S]=%+v", payload) - reply, peer, err := s.Join(payload) + reply, peer, err := s.Join(payload, stream) if err != nil { log.Errorf("Join err: %v", err) return err } - peer.sig = stream + p = peer log.Infof("[S->C]=%+v", reply) err = stream.Send(&room.Reply{Payload: reply}) @@ -95,8 +95,10 @@ func (s *RoomSignalService) Signal(stream room.RoomSignal_SignalServer) error { } } -func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer, error) { +func (s *RoomSignalService) Join(in *room.Request_Join, stream room.RoomSignal_SignalServer) (*room.Reply_Join, *Peer, error) { pinfo := in.Join.Peer + sid := pinfo.Sid + uid := pinfo.Uid if pinfo == nil || pinfo.Sid == "" && pinfo.Uid == "" { reply := &room.Reply_Join{ @@ -111,16 +113,12 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer } return reply, nil, status.Errorf(codes.Internal, "sid/uid is empty") } - key := util.GetRedisRoomKey(pinfo.Sid) - infos := s.rs.redis.HGetAll(key) - sid := infos["sid"] - uid := infos["uid"] - + key := util.GetRedisRoomKey(sid) // create in redis if room not exist if sid == "" { // store room info sid = pinfo.Sid - err := s.rs.redis.HMSetTTL(roomRedisExpire, key, "sid", pinfo.Sid, "name", pinfo.DisplayName, + err := s.rs.redis.HMSetTTL(roomRedisExpire, key, "sid", sid, "name", pinfo.DisplayName, "password", "", "description", "", "lock", "0") if err != nil { reply := &room.Reply_Join{ @@ -168,6 +166,7 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer peer = NewPeer() peer.info = pinfo + peer.sig = stream r.addPeer(peer) // TODO /* @@ -205,6 +204,42 @@ func (s *RoomSignalService) Join(in *room.Request_Join) (*room.Reply_Join, *Peer }, } + // find peer in room + key = util.GetRedisPeersPrefixKey(sid) + peersKeys := s.rs.redis.Keys(key) + + for _, pkey := range peersKeys { + res := s.rs.redis.HGetAll(pkey) + sid = res["sid"] + uid = res["uid"] + if sid == "" || uid == "" || uid == pinfo.Uid { + continue + } + key = util.GetRedisPeerKey(sid, uid) + res = s.rs.redis.HGetAll(key) + if len(res) != 0 { + info := &room.Peer{ + Sid: res["sid"], + Uid: res["uid"], + DisplayName: res["name"], + ExtraInfo: []byte(res["info"]), + Role: room.Role(room.Role_value[res["role"]]), + Protocol: room.Protocol(room.Protocol_value[res["protocol"]]), + Avatar: res["avatar"], + Direction: room.Peer_Direction(room.Peer_Direction_value["direction"]), + Vendor: res["vendor"], + } + + err := peer.sendPeerEvent(&room.PeerEvent{ + State: room.PeerState_JOIN, + Peer: info, + }) + + if err != nil { + log.Errorf("signal send peer event error: %v", err) + } + } + } return reply, peer, nil }