Skip to content

Commit

Permalink
fixed pagination of subscriptions in mysql
Browse files Browse the repository at this point in the history
Rate limit · GitHub

Access has been restricted

You have triggered a rate limit.

Please wait a few minutes before you try again;
in some cases this may take up to an hour.

or-else committed Jun 10, 2021
1 parent e9f0c46 commit c234903
Showing 1 changed file with 92 additions and 48 deletions.
140 changes: 92 additions & 48 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import (
"encoding/json"
"errors"
"hash/fnv"
"sort"
"strconv"
"strings"
"time"
@@ -1426,20 +1427,23 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}

limit := 0
ims := time.Time{}
if opts != nil {
if opts.Topic != "" {
q += " AND topic=?"
args = append(args, opts.Topic)
}

// Apply the limit only when the client does not manage the cache (or cold start).
// Otherwise have to get all subscriptions and do a manual join with users/topics.
if opts.IfModifiedSince == nil {
// Apply the limit only when the client does not manage cache.
// Otherwise have to get all subscriptions and do a manual join with users/topics.
if opts.Limit > 0 && opts.Limit < limit {
if opts.Limit > 0 && opts.Limit < a.maxResults {
limit = opts.Limit
} else {
limit = a.maxResults
}
} else {
ims = *opts.IfModifiedSince
}
} else {
limit = a.maxResults
@@ -1459,20 +1463,6 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
return nil, err
}

/*
if opts.IfModifiedSince != nil {
q += " AND updatedat>?"
args = append(args, opts.IfModifiedSince)
}
if opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
q += " LIMIT ?"
args = append(args, limit)
*/

// Fetch subscriptions. Two queries are needed: users table (p2p) and topics table (grp).
// Prepare a list of separate subscriptions to users vs topics
var sub t.Subscription
@@ -1488,12 +1478,11 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
sub.User = uid.String()
tcat := t.GetTopicCat(tname)

// One of 'me', 'fnd', 'sys' subscriptions, skip.
if tcat == t.TopicCatMe || tcat == t.TopicCatFnd || tcat == t.TopicCatSys {
// One of 'me', 'fnd', 'sys' subscriptions, skip.
continue

// p2p subscription, find the other user to get user.Public
} else if tcat == t.TopicCatP2P {
// P2P subscription, find the other user to get user.Public
uid1, uid2, _ := t.ParseP2P(tname)
if uid1 == uid {
usrq = append(usrq, store.DecodeUid(uid2))
@@ -1502,9 +1491,8 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}
topq = append(topq, tname)

// grp subscription
} else {
// Convert channel names to topic names.
// Group subscription. Maybe convert channel name to topic name.
tname = t.ChnToGrp(tname)
topq = append(topq, tname)
}
@@ -1521,41 +1509,41 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}

var subs []t.Subscription
if len(topq) > 0 || len(usrq) > 0 {
subs = make([]t.Subscription, 0, len(join))
if len(join) == 0 {
return subs, nil
}

// Fetch grp topics and join to subscriptions.
if len(topq) > 0 {
q = "SELECT createdat,updatedat,state,stateat,touchedat,name AS id,usebt,access,seqid,delid,public,tags " +
"FROM topics WHERE name IN (?)"
args = topq

q, args, _ = sqlx.In(q, topq)

if !keepDeleted {
// Optionally skip deleted topics.
q += " AND state!=?"
args = append(args, t.StateDeleted)
}

if opts != nil && opts.IfModifiedSince != nil {
if !ims.IsZero() {
// Use cache timestamp if provided: get newer entries only.
q += " AND updatedat>?"
args = append(args, opts.IfModifiedSince)
args = append(args, ims)

if limit > 0 && limit < len(topq) {
// No point in fetching more than the requested limit.
q += " ORDER BY updatedat LIMIT ?"
args = append(args, limit)
}
}

q, args, _ := sqlx.In(sql, args)
q = a.db.Rebind(q)

ctx2, cancel2 := a.getContext()
if cancel2 != nil {
defer cancel2()
}
rows, err = a.db.QueryxContext(ctx2, q, topq...)
rows, err = a.db.QueryxContext(ctx2, q, args...)
if err != nil {
return nil, err
}
@@ -1567,42 +1555,66 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}

sub = join[top.Id]
sub.ObjHeader.MergeTimes(&top.ObjHeader)
// Check if sub.UpdatedAt needs to be adjusted to earlier time.
// top.UpdatedAt is guaranted to be after ims.
if sub.UpdatedAt.Before(ims) {
// Subscription has not changed recently, use topic's update timestamp.
sub.UpdatedAt = top.UpdatedAt
} else if sub.UpdatedAt.After(top.UpdatedAt) {
// Subscription changed after the topic, using earlier timestamp.
if !ims.IsZero() {
sub.UpdatedAt = top.UpdatedAt
}
}
sub.SetState(top.State)
sub.SetTouchedAt(top.TouchedAt)
sub.SetSeqId(top.SeqId)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
// all done with a grp topic
sub.SetPublic(fromJSON(top.Public))
subs = append(subs, sub)
} else {
// put back the updated value of a p2p subsription, will process further below
join[top.Id] = sub
}
// Put back the updated value of a subsription, will process further below
join[top.Id] = sub
}
if err == nil {
err = rows.Err()
}
rows.Close()
}

if err != nil {
return nil, err
}

// Fetch p2p users and join to p2p subscriptions.
if err == nil && len(usrq) > 0 {
q, usrq, _ := sqlx.In(
"SELECT id,state,createdat,updatedat,state,stateat,access,lastseen,useragent,public,tags "+
"FROM users WHERE id IN (?)",
usrq)
if len(usrq) > 0 {
q = "SELECT id,state,createdat,updatedat,state,stateat,access,lastseen,useragent,public,tags " +
"FROM users WHERE id IN (?)"
q, args, _ = sqlx.In(q, usrq)
// Optionally skip deleted users.
if !keepDeleted {
q += " AND state!=?"
usrq = append(usrq, t.StateDeleted)
args = append(args, t.StateDeleted)
}

if !ims.IsZero() {
// Use cache timestamp if provided: get newer entries only.
q += " AND updatedat>?"
args = append(args, ims)

if limit > 0 && limit < len(usrq) {
// No point in fetching more than the requested limit.
q += " ORDER BY updatedat LIMIT ?"
args = append(args, limit)
}
}

q = a.db.Rebind(q)

ctx3, cancel3 := a.getContext()
if cancel3 != nil {
defer cancel3()
}
rows, err = a.db.QueryxContext(ctx3, q, usrq...)
rows, err = a.db.QueryxContext(ctx3, q, args...)
if err != nil {
return nil, err
}
@@ -1614,14 +1626,24 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
}

uid2 := encodeUidString(usr.Id)
if sub, ok := join[uid.P2PName(uid2)]; ok {
joinOn := uid.P2PName(uid2)
if sub, ok := join[joinOn]; ok {
if sub.UpdatedAt.Before(ims) {
// Subscription has not changed recently, use user's update timestamp.
sub.UpdatedAt = usr.UpdatedAt
} else if sub.UpdatedAt.After(usr.UpdatedAt) {
// Subscription changed after the user, using earlier timestamp.
if !ims.IsZero() {
sub.UpdatedAt = usr.UpdatedAt
}
}
sub.ObjHeader.MergeTimes(&usr.ObjHeader)
sub.SetState(usr.State)
sub.SetPublic(fromJSON(usr.Public))
sub.SetWith(uid2.UserId())
sub.SetDefaultAccess(usr.Access.Auth, usr.Access.Anon)
sub.SetLastSeenAndUA(usr.LastSeen, usr.UserAgent)
subs = append(subs, sub)
join[joinOn] = sub
}
}
if err == nil {
@@ -1630,11 +1652,33 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
rows.Close()
}

if opts.IfModifiedSince == nil {
// TODO; Now that we fetched potentially more subscriptions than needed, we got to take those with the oldest modifications
subs = make([]t.Subscription, 0, len(join))
for _, sub := range join {
subs = append(subs, sub)
}

return subs, err
limit = a.maxResults
if opts != nil && opts.Limit > 0 && opts.Limit < limit {
limit = opts.Limit
}
if !ims.IsZero() || len(subs) > limit {
// Now that we fetched potentially more subscriptions than needed, we got to take those with the oldest modifications.
// Sorting in ascending order by modification time.
sort.Slice(subs, func(i, j int) bool {
return subs[i].UpdatedAt.Before(subs[j].UpdatedAt)
})
if !ims.IsZero() {
// Keep only those subscriptions which are newer than ims.
at := sort.Search(len(subs), func(i int) bool { return subs[i].UpdatedAt.After(ims) })
subs = subs[at:]
}
// Trim slice at the limit.
if len(subs) > limit {
subs = subs[:limit]
}
}

return subs, nil
}

// UsersForTopic loads users subscribed to the given topic.

0 comments on commit c234903

Please sign in to comment.