Skip to content

Commit

Permalink
code review fixes and a little bit of new functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Jul 22, 2020
1 parent e283cd7 commit 267e1c9
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 32 deletions.
4 changes: 2 additions & 2 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ A group topic is created by sending a `{sub}` message with the topic field set t

A `channel` topic is different from the non-channel group topic in the following ways:

* Channel-enabled topic is created by sending `{sub topic="nch"}`. Sending `{sub topic="new"}` will create a group topic without enabling channel functionality.
* Channel topic is created by sending `{sub topic="nch"}`. Sending `{sub topic="new"}` will create a group topic without enabling channel functionality.
* Sending `{sub topic="chnAbC123"}` will create a `reader` subscription to a channel. A non-channel topic will reject such subscription request.
* When searching for topics using [`fnd`](#fnd-and-tags-finding-users-and-topics), channels will show addresses with `chn` prefixes, non-channel topic will show with `grp` prefixes.
* Default permissions for a channel and non-channel group topics are different: channel group topic grants no permissions at all.
* A subscriber joining or leaving the topic generates a `{pres}` message to all other subscribers who are currently in the joined state with the topic and have appropriate permissions. Reader joining or leaving the channel generates no `{pres}` message.
* A subscriber joining or leaving the topic (regular or channel-enabled) generates a `{pres}` message to all other subscribers who are currently in the joined state with the topic and have appropriate permissions. Reader joining or leaving the channel generates no `{pres}` message.

### `sys` Topic

Expand Down
2 changes: 1 addition & 1 deletion server/db/mongodb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
// grp subscription
} else {
// Convert channel names to topic names.
tname = t.GrpFromChn(tname)
tname = t.ChnToGrp(tname)
topq = append(topq, tname)
}
sub.Private = unmarshalBsonD(sub.Private)
Expand Down
6 changes: 3 additions & 3 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ func (a *adapter) UserUnreadCount(uid t.Uid) (int, error) {

func (a *adapter) topicCreate(tx *sqlx.Tx, topic *t.Topic) error {
_, err := tx.Exec("INSERT INTO topics(createdat,updatedat,touchedat,state,name,usebt,owner,access,public,tags) "+
"VALUES(?,?,?,?,?,?,?,?,?)",
"VALUES(?,?,?,?,?,?,?,?,?,?)",
topic.CreatedAt, topic.UpdatedAt, topic.TouchedAt, topic.State, topic.Id, topic.UseBt,
store.DecodeUid(t.ParseUid(topic.Owner)), topic.Access, toJSON(topic.Public), topic.Tags)
if err != nil {
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool, opts *t.QueryOpt) (
// grp subscription
} else {
// Convert channel names to topic names.
tname = t.GrpFromChn(tname)
tname = t.ChnToGrp(tname)
topq = append(topq, tname)
}
sub.Private = fromJSON(sub.Private)
Expand Down Expand Up @@ -2004,7 +2004,7 @@ func (a *adapter) FindTopics(req [][]string, opt []string) ([]t.Subscription, er
}

if isChan != 0 {
sub.Topic = t.ChnFromGrp(sub.Topic)
sub.Topic = t.GrpToChn(sub.Topic)
}
sub.SetPublic(fromJSON(public))
sub.SetDefaultAccess(access.Auth, access.Anon)
Expand Down
9 changes: 5 additions & 4 deletions server/db/rethinkdb/schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Fields:
* `Access` user's default access level for peer-to-peer topics
* `Auth`, `Anon` default permissions for authenticated and anonymous users
* `Public` application-defined data
* `State` currently unused
* `State` state of the user: normal, disabled, deleted
* `StateAt` timestamp when the state was last updated or NULL
* `LastSeen` timestamp when the user was last online
* `UserAgent` client User-Agent used when last online
* `Tags` unique strings for user discovery
Expand Down Expand Up @@ -99,10 +100,10 @@ Fields:
* `Auth`, `Anon` permissions for authenticated and anonymous users respectively
* `Owner` ID of the user who owns the topic
* `Public` application-defined data
* `State` currently unused
* `State` state of the topic: normal, disabled, deleted
* `SeqId` sequential ID of the last message
* `DelId` topic-sequential ID of the deletion operation
* `UseBt` currently unused
* `UseBt` indicator that channel functionality is enabled in the topic

Indexes:
* `Id` primary key
Expand Down Expand Up @@ -328,4 +329,4 @@ Sample:
"UpdatedAt": Sun Jun 10 2018 16:38:45 GMT+00:00 ,
"User": "7j-RR1V7O3Y"
}
```
```
50 changes: 28 additions & 22 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"errors"
"log"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -1011,13 +1012,11 @@ func (t *Topic) subscriptionReply(h *Hub, join *sessionJoin) error {
var err error
var changed bool
// Create new subscription or modify an existing one.
if changed, err = t.thisUserSub(h, join.sess, asUid, asLvl, join.pkt.Id, mode, private); err != nil {
if changed, err = t.thisUserSub(h, join, asUid, mode, private); err != nil {
return err
}

toriginal := t.original(asUid)
params := map[string]interface{}{}

if changed {
pud := t.perUser[asUid]
// Report back the assigned access mode.
Expand All @@ -1027,6 +1026,8 @@ func (t *Topic) subscriptionReply(h *Hub, join *sessionJoin) error {
Mode: (pud.modeGiven & pud.modeWant).String()}
}

toriginal := t.original(asUid)

// When a group topic is created, it's given a temporary name by the client.
// Then this name changes. Report back the original name here.
if msgsub.Created && join.pkt.Original != toriginal {
Expand All @@ -1045,29 +1046,32 @@ func (t *Topic) subscriptionReply(h *Hub, join *sessionJoin) error {

// User requests or updates a self-subscription to a topic. Called as a
// result of {sub} or {meta set=sub}.
// Returns changed == true if user's accessmode has changed.
// Returns changed == true if user's access mode has changed.
//
// h - hub
// sess - originating session
// asUid - id of the user making the request
// asLvl - access level of the user making the request
// pktID - id of {sub} or {set} packet
// want - requested access mode
// private - private value to assign to the subscription
// background - presence notifications are deferred
// sessOverrides - session param overrides
// h - hub
// sess - originating session
// asUid - id of the user making the request
// asLvl - access level of the user making the request
// want - requested access mode
// private - private value to assign to the subscription
// background - presence notifications are deferred
// sessOverrides - session param overrides
//
// Handle these cases:
// A. User is trying to subscribe for the first time (no subscription)
// B. User is already subscribed, just joining without changing anything
// C. User is responding to an earlier invite (modeWant was "N" in subscription)
// D. User is already subscribed, changing modeWant
// E. User is accepting ownership transfer (requesting ownership transfer is not permitted)
func (t *Topic) thisUserSub(h *Hub, sess *Session, asUid types.Uid, asLvl auth.Level,
pktID, want string, private interface{}) (bool, error) {
// A. User is trying to subscribe for the first time (no subscription).
// B. User is already subscribed, just joining without changing anything.
// C. User is responding to an earlier invite (modeWant was "N" in subscription).
// D. User is already subscribed, changing modeWant.
// E. User is accepting ownership transfer (requesting ownership transfer is not permitted).
// In case of a group topic the user may be a reader or a full subscriber.
func (t *Topic) thisUserSub(h *Hub, join *sessionJoin, asUid types.Uid, want string,
private interface{}) (bool, error) {

now := types.TimeNow()
toriginal := t.original(asUid)

sess := join.sess
pktID := join.pkt.Id
asLvl := auth.Level(join.pkt.AuthLvl)

var changed bool

Expand All @@ -1079,11 +1083,13 @@ func (t *Topic) thisUserSub(h *Hub, sess *Session, asUid types.Uid, asLvl auth.L
modeWant := types.ModeUnset
if want != "" {
if err := modeWant.UnmarshalText([]byte(want)); err != nil {
sess.queueOut(ErrMalformed(pktID, toriginal, now))
sess.queueOut(ErrMalformed(pktID, join.pkt.Original, now))
return changed, err
}
}

toriginal := t.original(asUid)

// Check if it's an attempt at a new subscription to the topic.
// It could be an actual subscription (IsJoiner() == true) or a ban (IsJoiner() == false)
userData, existingSub := t.perUser[asUid]
Expand Down

0 comments on commit 267e1c9

Please sign in to comment.