From 617138ebd7c63cc67133f637c5aa0bf3bf77b93c Mon Sep 17 00:00:00 2001 From: or-else Date: Mon, 8 Apr 2019 14:56:17 +0300 Subject: [PATCH] prevent unbounded growth of user cache --- server/hub.go | 3 +- server/topic.go | 24 ++++++++++------ server/user.go | 76 ++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 87 insertions(+), 16 deletions(-) diff --git a/server/hub.go b/server/hub.go index cc38fd710..7b050e0e9 100644 --- a/server/hub.go +++ b/server/hub.go @@ -799,6 +799,7 @@ func topicInit(sreg *sessionJoin, h *Hub) { statsInc("LiveTopics", 1) statsInc("TotalTopics", 1) + usersRegisterTopic(t, types.ZeroUid, true) go t.run(h) @@ -889,7 +890,6 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea h.topicDel(topic) t.exit <- &shutDown{reason: StopDeleted} - statsInc("LiveTopics", -1) } else { // Case 1.1.2: requester is NOT the owner @@ -983,6 +983,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea if t := h.topicGet(topic); t != nil { t.suspend() h.topicDel(topic) + t.exit <- &shutDown{reason: reason} statsInc("LiveTopics", -1) diff --git a/server/topic.go b/server/topic.go index a296f1875..820f9f5a0 100644 --- a/server/topic.go +++ b/server/topic.go @@ -399,7 +399,7 @@ func (t *Topic) run(hub *Hub) { t.presPubMessageCount(uid, recv, read, msg.skipSid) // Update cached count of unread messages - usersUpdateCache(uid, unread, true) + usersUpdateUnread(uid, unread, true) t.perUser[uid] = pud } @@ -589,7 +589,6 @@ func (t *Topic) run(hub *Hub) { // 2. Topic is being deleted (reason == StopDeleted) // 3. System shutdown (reason == StopShutdown, done != nil). // 4. Cluster rehashing (reason == StopRehashing) - // TODO(gene): save lastMessage value; if sd.reason == StopDeleted { if t.cat == types.TopicCatGrp { @@ -612,6 +611,8 @@ func (t *Topic) run(hub *Hub) { s.detach <- t.name } + usersRegisterTopic(t, types.ZeroUid, false) + // Report completion back to sender, if 'done' is not nil. if sd.done != nil { sd.done <- true @@ -815,6 +816,11 @@ func (t *Topic) subCommonReply(h *Hub, sreg *sessionJoin) error { } sreg.sess.queueOut(NoErrParams(sreg.pkt.id, toriginal, now, params)) + if sreg.newsub && !sreg.created { + // Register new subscription if it's not a part of topic creation. + usersRegisterTopic(nil, asUid, true) + } + return nil } @@ -1030,10 +1036,10 @@ func (t *Topic) requestSub(h *Hub, sess *Session, asUid types.Uid, asLvl auth.Le newReader := (userData.modeWant & userData.modeGiven).IsReader() if oldReader && !newReader { // Decrement unread count - usersUpdateCache(asUid, userData.readID-t.lastID, true) + usersUpdateUnread(asUid, userData.readID-t.lastID, true) } else if !oldReader && newReader { // Increment unread count - usersUpdateCache(asUid, t.lastID-userData.readID, true) + usersUpdateUnread(asUid, t.lastID-userData.readID, true) } t.notifySubChange(asUid, asUid, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid) } @@ -1183,10 +1189,10 @@ func (t *Topic) approveSub(h *Hub, sess *Session, asUid, target types.Uid, set * newReader := (userData.modeWant & userData.modeGiven).IsReader() if oldReader && !newReader { // Decrement unread count - usersUpdateCache(target, userData.readID-t.lastID, true) + usersUpdateUnread(target, userData.readID-t.lastID, true) } else if !oldReader && newReader { // Increment unread count - usersUpdateCache(target, t.lastID-userData.readID, true) + usersUpdateUnread(target, t.lastID-userData.readID, true) } t.notifySubChange(target, asUid, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid) @@ -2088,7 +2094,7 @@ func (t *Topic) replyDelSub(h *Hub, sess *Session, asUid types.Uid, del *MsgClie // Update cached unread count: negative value if (pud.modeWant & pud.modeGiven).IsReader() { - usersUpdateCache(uid, pud.readID-t.lastID, true) + usersUpdateUnread(uid, pud.readID-t.lastID, true) } // ModeUnset signifies deleted subscription as opposite to ModeNone - no access. @@ -2126,7 +2132,7 @@ func (t *Topic) replyLeaveUnsub(h *Hub, sess *Session, asUid types.Uid, id strin // Update cached unread count: negative value if (pud.modeWant & pud.modeGiven).IsReader() { - usersUpdateCache(asUid, pud.readID-t.lastID, true) + usersUpdateUnread(asUid, pud.readID-t.lastID, true) } // Send notifications. @@ -2152,6 +2158,8 @@ func (t *Topic) evictUser(uid types.Uid, unsub bool, skip string) { } else { // Grp: delete per-user data delete(t.perUser, uid) + + usersRegisterTopic(nil, uid, false) } } else { // Clear online status diff --git a/server/user.go b/server/user.go index 3fe28f22d..418ee48f5 100644 --- a/server/user.go +++ b/server/user.go @@ -364,8 +364,10 @@ func replyDelUser(s *Session, msg *ClientComMessage) { } type userUpdate struct { - // User id being updated + // Single user ID being updated uid types.Uid + // User IDs being updated + uidList []types.Uid // Unread count unread int // Treat the count as an increment as opposite to the final value. @@ -377,6 +379,7 @@ type userUpdate struct { type UserCacheEntry struct { unread int + topics int } var usersCache map[types.Uid]UserCacheEntry @@ -397,7 +400,7 @@ func usersShutdown() { } } -func usersUpdateCache(uid types.Uid, val int, inc bool) { +func usersUpdateUnread(uid types.Uid, val int, inc bool) { if globals.usersUpdate != nil && (!inc || val != 0) { select { case globals.usersUpdate <- &userUpdate{uid: uid, unread: val, inc: inc}: @@ -406,6 +409,7 @@ func usersUpdateCache(uid types.Uid, val int, inc bool) { } } +// Process push notification. func usersPush(rcpt *push.Receipt) { if globals.usersUpdate != nil { select { @@ -415,11 +419,41 @@ func usersPush(rcpt *push.Receipt) { } } +// Account users as members of an active topic. Used for cache management. +func usersRegisterTopic(t *Topic, uid types.Uid, add bool) { + var upd *userUpdate + if t != nil { + if len(t.perUser) == 0 { + // me and fnd topics + return + } + + upd = &userUpdate{uidList: make([]types.Uid, len(t.perUser))} + i := 0 + for uid := range t.perUser { + upd.uidList[i] = uid + i++ + } + } else { + upd = &userUpdate{uidList: make([]types.Uid, 1)} + upd.uidList[0] = uid + } + + upd.inc = add + + if globals.usersUpdate != nil { + select { + case globals.usersUpdate <- upd: + default: + } + } +} + // The go routine for processing updates to users cache. func userUpdater() { - updater := func(uid types.Uid, val int, inc bool) int { + unreadUpdater := func(uid types.Uid, val int, inc bool) int { uce, ok := usersCache[uid] - if !ok { + if !ok || uce.unread < 0 { count, err := store.Users.GetUnreadCount(uid) if err != nil { log.Println("users: failed to load unread count", err) @@ -447,16 +481,44 @@ func userUpdater() { if upd.pushRcpt != nil { for uid, rcptTo := range upd.pushRcpt.To { // Handle update - unread := updater(uid, 1, true) + unread := unreadUpdater(uid, 1, true) if unread >= 0 { rcptTo.Unread = unread upd.pushRcpt.To[uid] = rcptTo } } push.Push(upd.pushRcpt) - } else { - updater(upd.uid, upd.unread, upd.inc) + continue } + + if len(upd.uidList) > 0 { + for _, uid := range upd.uidList { + uce, ok := usersCache[uid] + if upd.inc { + if !ok { + // This is a registration of a new user. + // We are not loading unread count here, so set it to -1. + uce.unread = -1 + } + uce.topics++ + usersCache[uid] = uce + } else if ok { + if uce.topics > 1 { + uce.topics-- + usersCache[uid] = uce + } else { + // Remove user from cache + delete(usersCache, uid) + } + } else { + // BUG! + panic("request to unregister user which has not been registered") + } + } + continue + } + + unreadUpdater(upd.uid, upd.unread, upd.inc) } log.Println("users: shutdown")