Skip to content

Commit

Permalink
prevent unbounded growth of user cache
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Apr 8, 2019
1 parent a539cf3 commit 617138e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 16 deletions.
3 changes: 2 additions & 1 deletion server/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ func topicInit(sreg *sessionJoin, h *Hub) {

statsInc("LiveTopics", 1)
statsInc("TotalTopics", 1)
usersRegisterTopic(t, types.ZeroUid, true)

go t.run(h)

Expand Down Expand Up @@ -889,7 +890,6 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea

h.topicDel(topic)
t.exit <- &shutDown{reason: StopDeleted}

statsInc("LiveTopics", -1)
} else {
// Case 1.1.2: requester is NOT the owner
Expand Down Expand Up @@ -983,6 +983,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea
if t := h.topicGet(topic); t != nil {
t.suspend()
h.topicDel(topic)

t.exit <- &shutDown{reason: reason}

statsInc("LiveTopics", -1)
Expand Down
24 changes: 16 additions & 8 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (t *Topic) run(hub *Hub) {
t.presPubMessageCount(uid, recv, read, msg.skipSid)

// Update cached count of unread messages
usersUpdateCache(uid, unread, true)
usersUpdateUnread(uid, unread, true)

t.perUser[uid] = pud
}
Expand Down Expand Up @@ -589,7 +589,6 @@ func (t *Topic) run(hub *Hub) {
// 2. Topic is being deleted (reason == StopDeleted)
// 3. System shutdown (reason == StopShutdown, done != nil).
// 4. Cluster rehashing (reason == StopRehashing)
// TODO(gene): save lastMessage value;

if sd.reason == StopDeleted {
if t.cat == types.TopicCatGrp {
Expand All @@ -612,6 +611,8 @@ func (t *Topic) run(hub *Hub) {
s.detach <- t.name
}

usersRegisterTopic(t, types.ZeroUid, false)

// Report completion back to sender, if 'done' is not nil.
if sd.done != nil {
sd.done <- true
Expand Down Expand Up @@ -815,6 +816,11 @@ func (t *Topic) subCommonReply(h *Hub, sreg *sessionJoin) error {
}
sreg.sess.queueOut(NoErrParams(sreg.pkt.id, toriginal, now, params))

if sreg.newsub && !sreg.created {
// Register new subscription if it's not a part of topic creation.
usersRegisterTopic(nil, asUid, true)
}

return nil
}

Expand Down Expand Up @@ -1030,10 +1036,10 @@ func (t *Topic) requestSub(h *Hub, sess *Session, asUid types.Uid, asLvl auth.Le
newReader := (userData.modeWant & userData.modeGiven).IsReader()
if oldReader && !newReader {
// Decrement unread count
usersUpdateCache(asUid, userData.readID-t.lastID, true)
usersUpdateUnread(asUid, userData.readID-t.lastID, true)
} else if !oldReader && newReader {
// Increment unread count
usersUpdateCache(asUid, t.lastID-userData.readID, true)
usersUpdateUnread(asUid, t.lastID-userData.readID, true)
}
t.notifySubChange(asUid, asUid, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid)
}
Expand Down Expand Up @@ -1183,10 +1189,10 @@ func (t *Topic) approveSub(h *Hub, sess *Session, asUid, target types.Uid, set *
newReader := (userData.modeWant & userData.modeGiven).IsReader()
if oldReader && !newReader {
// Decrement unread count
usersUpdateCache(target, userData.readID-t.lastID, true)
usersUpdateUnread(target, userData.readID-t.lastID, true)
} else if !oldReader && newReader {
// Increment unread count
usersUpdateCache(target, t.lastID-userData.readID, true)
usersUpdateUnread(target, t.lastID-userData.readID, true)
}

t.notifySubChange(target, asUid, oldWant, oldGiven, userData.modeWant, userData.modeGiven, sess.sid)
Expand Down Expand Up @@ -2088,7 +2094,7 @@ func (t *Topic) replyDelSub(h *Hub, sess *Session, asUid types.Uid, del *MsgClie

// Update cached unread count: negative value
if (pud.modeWant & pud.modeGiven).IsReader() {
usersUpdateCache(uid, pud.readID-t.lastID, true)
usersUpdateUnread(uid, pud.readID-t.lastID, true)
}

// ModeUnset signifies deleted subscription as opposite to ModeNone - no access.
Expand Down Expand Up @@ -2126,7 +2132,7 @@ func (t *Topic) replyLeaveUnsub(h *Hub, sess *Session, asUid types.Uid, id strin

// Update cached unread count: negative value
if (pud.modeWant & pud.modeGiven).IsReader() {
usersUpdateCache(asUid, pud.readID-t.lastID, true)
usersUpdateUnread(asUid, pud.readID-t.lastID, true)
}

// Send notifications.
Expand All @@ -2152,6 +2158,8 @@ func (t *Topic) evictUser(uid types.Uid, unsub bool, skip string) {
} else {
// Grp: delete per-user data
delete(t.perUser, uid)

usersRegisterTopic(nil, uid, false)
}
} else {
// Clear online status
Expand Down
76 changes: 69 additions & 7 deletions server/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,10 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
}

type userUpdate struct {
// User id being updated
// Single user ID being updated
uid types.Uid
// User IDs being updated
uidList []types.Uid
// Unread count
unread int
// Treat the count as an increment as opposite to the final value.
Expand All @@ -377,6 +379,7 @@ type userUpdate struct {

type UserCacheEntry struct {
unread int
topics int
}

var usersCache map[types.Uid]UserCacheEntry
Expand All @@ -397,7 +400,7 @@ func usersShutdown() {
}
}

func usersUpdateCache(uid types.Uid, val int, inc bool) {
func usersUpdateUnread(uid types.Uid, val int, inc bool) {
if globals.usersUpdate != nil && (!inc || val != 0) {
select {
case globals.usersUpdate <- &userUpdate{uid: uid, unread: val, inc: inc}:
Expand All @@ -406,6 +409,7 @@ func usersUpdateCache(uid types.Uid, val int, inc bool) {
}
}

// Process push notification.
func usersPush(rcpt *push.Receipt) {
if globals.usersUpdate != nil {
select {
Expand All @@ -415,11 +419,41 @@ func usersPush(rcpt *push.Receipt) {
}
}

// Account users as members of an active topic. Used for cache management.
func usersRegisterTopic(t *Topic, uid types.Uid, add bool) {
var upd *userUpdate
if t != nil {
if len(t.perUser) == 0 {
// me and fnd topics
return
}

upd = &userUpdate{uidList: make([]types.Uid, len(t.perUser))}
i := 0
for uid := range t.perUser {
upd.uidList[i] = uid
i++
}
} else {
upd = &userUpdate{uidList: make([]types.Uid, 1)}
upd.uidList[0] = uid
}

upd.inc = add

if globals.usersUpdate != nil {
select {
case globals.usersUpdate <- upd:
default:
}
}
}

// The go routine for processing updates to users cache.
func userUpdater() {
updater := func(uid types.Uid, val int, inc bool) int {
unreadUpdater := func(uid types.Uid, val int, inc bool) int {
uce, ok := usersCache[uid]
if !ok {
if !ok || uce.unread < 0 {
count, err := store.Users.GetUnreadCount(uid)
if err != nil {
log.Println("users: failed to load unread count", err)
Expand Down Expand Up @@ -447,16 +481,44 @@ func userUpdater() {
if upd.pushRcpt != nil {
for uid, rcptTo := range upd.pushRcpt.To {
// Handle update
unread := updater(uid, 1, true)
unread := unreadUpdater(uid, 1, true)
if unread >= 0 {
rcptTo.Unread = unread
upd.pushRcpt.To[uid] = rcptTo
}
}
push.Push(upd.pushRcpt)
} else {
updater(upd.uid, upd.unread, upd.inc)
continue
}

if len(upd.uidList) > 0 {
for _, uid := range upd.uidList {
uce, ok := usersCache[uid]
if upd.inc {
if !ok {
// This is a registration of a new user.
// We are not loading unread count here, so set it to -1.
uce.unread = -1
}
uce.topics++
usersCache[uid] = uce
} else if ok {
if uce.topics > 1 {
uce.topics--
usersCache[uid] = uce
} else {
// Remove user from cache
delete(usersCache, uid)
}
} else {
// BUG!
panic("request to unregister user which has not been registered")
}
}
continue
}

unreadUpdater(upd.uid, upd.unread, upd.inc)
}

log.Println("users: shutdown")
Expand Down

0 comments on commit 617138e

Please sign in to comment.