Skip to content

Commit

Permalink
Merge branch 'next' of https://github.com/tinode/chat into next
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Dec 9, 2022
2 parents 86d6d72 + 2e7d737 commit a67211e
Show file tree
Hide file tree
Showing 17 changed files with 880 additions and 492 deletions.
2 changes: 1 addition & 1 deletion build-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fi

echo "Releasing $version"

GOSRC=${GOPATH}/src/github.com/tinode
GOSRC=..

pushd ${GOSRC}/chat > /dev/null

Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,7 @@ func (c *Cluster) gcProxySessions(activeNodes []string) {
for name := range c.nodes {
allNodes = append(allNodes, name)
}
_, failedNodes := stringSliceDelta(allNodes, activeNodes)
_, failedNodes, _ := stringSliceDelta(allNodes, activeNodes)
for _, node := range failedNodes {
// Iterate sessions of a failed node
c.gcProxySessionsForNode(node)
Expand Down
4 changes: 4 additions & 0 deletions server/db/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Adapter interface {
// the R permission. If read fails, the counts are still returned with the original
// user IDs but with the unread count undefined and non-nil error.
UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error)
// UserGetUnvalidated returns a list of uids which have unvalidated credentials
// and haven't been updated since lastUpdatedBefore,
// their auth levels and a comma separated list of these unvalidated credential names.
UserGetUnvalidated(lastUpdatedBefore time.Time) ([]t.Uid, []auth.Level, []string, error)

// Credential management

Expand Down
95 changes: 95 additions & 0 deletions server/db/mongodb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,101 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
return counts, nil
}

// UserGetUnvalidated returns a list of uids which have unvalidated credentials
// and haven't been updated since lastUpdatedBefore.
func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time) ([]t.Uid, []auth.Level, []string, error) {
/*
Query:
db.users.aggregate([
{ $match: { updatedat: { $lt: ISODate("2022-12-09T01:26:15.819Z") } } },
{ $lookup: { from: "auth", localField: "_id", foreignField: "userid", as: "fauth"} },
{ $replaceRoot: { newRoot: { $mergeObjects: [ {$arrayElemAt: [ "$fauth", 0 ]} , "$$ROOT" ] } } },
{ $lookup: { from: "credentials", localField: "_id", foreignField: "user", as: "fcred"} },
{ $unwind: "$fcred" },
{ $replaceRoot: { newRoot: { $mergeObjects: [ "$fcred", "$$ROOT" ] } } },
{ $match: { done: { $eq: false } } },
{ $project: { _id: 0, userid: 1, authlvl: 1, method: 1 } },
{ $group: { _id: {userid:"$userid", authlvl:"$authlvl"}, meth: { $push: { $concat: ["$method"] } } } }
])
Result: [{ _id: { userid: 'Q_aVB7uiWvQ', authlvl: 30 }, meth: [ 'email', 'tel' ] },
{ _id: { userid: 'mbyUHrLQcjw', authlvl: 20 }, meth: [ 'email', 'tel' ] }]
*/
pipeline := b.A{
b.M{"$match": b.M{"updatedat": b.M{"$lt": lastUpdatedBefore}}},
// Join with auth collection
b.M{"$lookup": b.M{
"from": "auth",
"localField": "_id",
"foreignField": "userid",
"as": "fauth"},
},
// Merge two documents into one
b.M{"$replaceRoot": b.M{"newRoot": b.M{"$mergeObjects": b.A{b.M{"$arrayElemAt": b.A{"$fauth", 0}}, "$$ROOT"}}}},

// Join with credentials collection
b.M{"$lookup": b.M{
"from": "credentials",
"localField": "_id",
"foreignField": "user",
"as": "fcred"},
},
// Unwind credentials
b.M{"$unwind": "$fcred"},
// Merge documents into one
b.M{"$replaceRoot": b.M{"newRoot": b.M{"$mergeObjects": b.A{"$fcred", "$$ROOT"}}}},
b.M{"$match": b.M{"done": b.M{"$eq": false}}},

// Remove unused fields.
b.M{"$project": b.M{"_id": 0, "userid": 1, "authlvl": 1, "method": 1}},
// GROUP BY userid and authlvl.
b.M{"$group": b.M{"_id": b.M{"userid": "$userid", "authlvl": "$authlvl"}, "meth": b.M{"$push": b.M{"$concat": b.A{"$method"}}}}},
}
cur, err := a.db.Collection("users").Aggregate(a.ctx, pipeline)
if err != nil {
return nil, nil, nil, err
}
defer cur.Close(a.ctx)

var uids []t.Uid
var authLvls []auth.Level
var unvalidatedCreds []string
for cur.Next(a.ctx) {
var oneUser struct {
Id map[string]interface{} `bson:"_id"`
Method []string `bson:"meth"`
}
cur.Decode(&oneUser)
var uid t.Uid
if uidInt, found := oneUser.Id["userid"]; found {
if uidStr, ok := uidInt.(string); ok {
uid = t.ParseUid(uidStr)
} else {
return nil, nil, nil, errors.New("Could not deserialize uid field")
}
} else {
return nil, nil, nil, errors.New("userid field not found in result")
}

var authLvl int
if authInt, found := oneUser.Id["authlvl"]; found {
if authNum, ok := authInt.(int32); ok {
authLvl = int(authNum)
} else {
return nil, nil, nil, errors.New("Could not deserialize authLvl field")
}
} else {
return nil, nil, nil, errors.New("authlvl field not found in result")
}

creds := strings.Join(oneUser.Method, ",")
uids = append(uids, uid)
authLvls = append(authLvls, auth.Level(authLvl))
unvalidatedCreds = append(unvalidatedCreds, creds)
}

return uids, authLvls, unvalidatedCreds, err
}

// Credential management

// CredUpsert adds or updates a validation record. Returns true if inserted, false if updated.
Expand Down
40 changes: 40 additions & 0 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,46 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
return counts, err
}

// UserGetUnvalidated returns a list of uids which have unvalidated credentials
// and haven't been updated since lastUpdatedBefore.
func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time) ([]t.Uid, []auth.Level, []string, error) {
var uids []t.Uid
var authLvls []auth.Level
var unvalidatedCreds []string

ctx, cancel := a.getContext()
if cancel != nil {
defer cancel()
}

rows, err := a.db.QueryxContext(ctx,
"SELECT u.id, a.authlvl, GROUP_CONCAT(c.method ORDER BY c.method SEPARATOR ',') "+
"FROM users u JOIN credentials c ON u.id = c.userid JOIN auth a ON u.id = a.userid "+
"WHERE u.updatedat<? AND c.done = 0 GROUP BY u.id, a.authlvl", lastUpdatedBefore)
if err != nil {
return nil, nil, nil, err
}

var userId int64
var authLvl int
var creds string
for rows.Next() {
if err = rows.Scan(&userId, &authLvl, &creds); err != nil {
break
}
uid := store.EncodeUid(userId)
uids = append(uids, uid)
authLvls = append(authLvls, auth.Level(authLvl))
unvalidatedCreds = append(unvalidatedCreds, creds)
}
if err == nil {
err = rows.Err()
}
rows.Close()

return uids, authLvls, unvalidatedCreds, err
}

// *****************************

func (a *adapter) topicCreate(tx *sqlx.Tx, topic *t.Topic) error {
Expand Down
71 changes: 71 additions & 0 deletions server/db/rethinkdb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1073,6 +1073,77 @@ func (a *adapter) UserUnreadCount(ids ...t.Uid) (map[t.Uid]int, error) {
return counts, err
}

// UserGetUnvalidated returns a list of uids which have unvalidated credentials
// and haven't been updated since lastUpdatedBefore.
func (a *adapter) UserGetUnvalidated(lastUpdatedBefore time.Time) ([]t.Uid, []auth.Level, []string, error) {
/*
Query:
r.db('tinode').table('users')
.filter(r.row('UpdatedAt').lt(new Date('???')))
.eqJoin('Id', r.db('tinode').table('auth'), {index: 'userid'})
.eqJoin(function(row) { return row('left')('Id') }, r.db('tinode').table('credentials'), {index: 'User'})
.filter(r.row('right')('Done').eq(false))
.map({
'id': r.row('left')('left')('Id'),
'authLvl': r.row('left')('right')('authLvl'),
'method': r.row('right')('Method')
})
.group('id', 'authLvl')
.merge(r.row('method'))
Result: [{ "group": [ "FM-pU6xs3N0" , 20 ], "reduction": [ "email" , "tel" ] },
{ "group": [ "H-JTBLtkO9Y" , 30 ], "reduction": [ "tel" , "email" ] }]
*/
cursor, err := rdb.DB(a.dbName).Table("users").
Filter(rdb.Row.Field("UpdatedAt").Lt(lastUpdatedBefore)).
EqJoin("Id", rdb.DB(a.dbName).Table("auth"), rdb.EqJoinOpts{Index: "userid"}).
EqJoin(func(row rdb.Term) rdb.Term { return row.Field("left").Field("Id") },
rdb.DB(a.dbName).Table("credentials"), rdb.EqJoinOpts{Index: "User"}).
Filter(rdb.Row.Field("right").Field("Done").Eq(false)).
Map(map[string]interface{}{
"id": rdb.Row.Field("left").Field("left").Field("Id"),
"authLvl": rdb.Row.Field("left").Field("right").Field("authLvl"),
"method": rdb.Row.Field("right").Field("Method"),
}).
Group("id", "authLvl").
Merge(rdb.Row.Field("method")).
Run(a.conn)
if err != nil {
return nil, nil, nil, err
}
defer cursor.Close()

var rec struct {
// string, int
Group [2]interface{}
Reduction []string
}
var uids []t.Uid
var authLvls []auth.Level
var unvalidatedCreds []string
for cursor.Next(&rec) {
var uid t.Uid
if uidStr, ok := rec.Group[0].(string); ok {
uid = t.ParseUid(uidStr)
} else {
return nil, nil, nil, errors.New("Could not deserialize uid field")
}
var authLvl int
if authNum, ok := rec.Group[1].(float64); ok {
authLvl = int(authNum)
} else {
return nil, nil, nil, errors.New("Could not deserialize authLvl field")
}
creds := strings.Join(rec.Reduction, ",")
uids = append(uids, uid)
authLvls = append(authLvls, auth.Level(authLvl))
unvalidatedCreds = append(unvalidatedCreds, creds)
}
err = cursor.Err()

return uids, authLvls, unvalidatedCreds, err
}

// *****************************

// TopicCreate creates a topic from template
Expand Down
25 changes: 25 additions & 0 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ type validatorConfig struct {
Config json.RawMessage `json:"config"`
}

// Stale unvalidated user account GC config.
type accountGcConfig struct {
// How often to run GC (seconds).
GcPeriod int `json:"gc_period"`
// Number of accounts to delete in one pass.
GcBlockSize int `json:"gc_block_size"`
// Minimum hours since account was last modified.
GcMinAccountAge int `json:"gc_min_account_age"`
}

// Large file handler config.
type mediaConfig struct {
// The name of the handler to use for file uploads.
Expand Down Expand Up @@ -265,6 +275,7 @@ type configType struct {
TLS json.RawMessage `json:"tls"`
Auth map[string]json.RawMessage `json:"auth_config"`
Validator map[string]*validatorConfig `json:"acc_validation"`
AccountGC *accountGcConfig `json:"acc_gc_config"`
Media *mediaConfig `json:"media"`
WebRTC json.RawMessage `json:"webrtc"`
}
Expand Down Expand Up @@ -545,6 +556,20 @@ func main() {
}
}

// Stale unvalidated user account garbage collection.
if config.AccountGC != nil {
if config.AccountGC.GcPeriod > 0 && config.AccountGC.GcBlockSize > 0 &&
config.AccountGC.GcMinAccountAge > 0 {
gcPeriod := time.Second * time.Duration(config.AccountGC.GcPeriod)
stopAccountGc := garbageCollectUsers(gcPeriod, config.AccountGC.GcBlockSize, config.AccountGC.GcMinAccountAge)

defer func() {
stopAccountGc <- true
logs.Info.Println("Stopped account garbage collector")
}()
}
}

pushHandlers, err := push.Init(config.Push)
if err != nil {
logs.Err.Fatal("Failed to initialize push notifications:", err)
Expand Down
6 changes: 3 additions & 3 deletions server/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,10 @@ func ParsePluginFilter(s *string, filterBy int) (*PluginFilter, error) {
}

// PluginRPCFilterConfig filters for an individual RPC call. Filter strings are formatted as follows:
// <comma separated list of packet names> : <comma separated list of topics or topic types> : <actions (combination of C U D)>
// <comma separated list of packet names> ; <comma separated list of topics or topic types> ; <actions (combination of C U D)>
// For instance:
// "acc,login::CU" - grab packets {acc} or {login}; no filtering by topic, Create or Update action
// "pub,pres:me,p2p:"
// "acc,login;;CU" - grab packets {acc} or {login}; no filtering by topic, Create or Update action
// "pub,pres;me,p2p;"
type pluginRPCFilterConfig struct {
// Filter by packet name, topic type [or exact name - not supported yet]. 2D: "pub,pres;p2p,me"
FireHose *string `json:"fire_hose"`
Expand Down
2 changes: 1 addition & 1 deletion server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func (s *Session) login(msg *ClientComMessage) {
// Check responses. Ignore invalid responses, just keep cred unvalidated.
if validated, _, err = validatedCreds(rec.Uid, rec.AuthLevel, msg.Login.Cred, false); err == nil {
// Get a list of credentials which have not been validated.
_, missing = stringSliceDelta(globals.authValidators[rec.AuthLevel], validated)
_, missing, _ = stringSliceDelta(globals.authValidators[rec.AuthLevel], validated)
}
}
if err != nil {
Expand Down
Loading

0 comments on commit a67211e

Please sign in to comment.