Skip to content

Commit

Permalink
mysql is code-complete
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Jan 31, 2020
1 parent a60f296 commit 6655b76
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 29 deletions.
10 changes: 7 additions & 3 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Each user is assigned a unique ID. The IDs are composed as `usr` followed by bas

* `created`: timestamp when the user record was created
* `updated`: timestamp of when user's `public` was last updated
* `status`: state of the account; two states are currently defined: `ok` and `suspended`
* `status`: state of the account
* `username`: unique string used in `basic` authentication; username is not accessible to other users
* `defacs`: object describing user's default access mode for peer to peer conversations with authenticated and anonymous users; see [Access control](#access-control) for details
* `auth`: default access mode for authenticated `auth` users
Expand All @@ -157,9 +157,13 @@ Each user is assigned a unique ID. The IDs are composed as `usr` followed by bas
* `private`: an application-defined object that is unique to the current user and accessible only by the user.
* `tags`: [discovery](#fnd-and-tags-finding-users-and-topics) and credentials.

A user account has a state. By default the state is `ok` which means the account is not restricted in any way and can be used normally. The other state is `suspended` which will prevent the user from accessing the account.
User's account has a state. The following states are defined:
* `ok` (normal): the default state which means the account is not restricted in any way and can be used normally;
* `susp` (suspended): the user is prevented from accessing the account as well as not found through [search](#fnd-and-tags-finding-users-and-topics); the state can be assigned by the administrator and fully reversible.
* `del` (soft-deleted): user is marked as deleted but user's data is retained; un-deleting the user is not currenly supported.
* `undef` (undefined): used internally by authenticators; should not be used elsewhere.

A user may maintain multiple simultaneous connections (sessions) with the server. Each session is tagged with a client-provided `User Agent` string intended to differentiate client software.
A user may maintain multisple simultaneous connections (sessions) with the server. Each session is tagged with a client-provided `User Agent` string intended to differentiate client software.

Logging out is not supported by design. If an application needs to change the user, it should open a new connection and authenticate it with the new user credentials.

Expand Down
36 changes: 36 additions & 0 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,34 @@ func (a *adapter) UserDelete(uid t.Uid, hard bool) error {
return tx.Commit()
}

// topicStateForUser is called by UserUpdate when the update contains state change.
func (a *adapter) topicStateForUser(tx *sqlx.Tx, decoded_uid int64, now time.Time, update interface{}) error {
var err error

state, ok := update.(t.ObjState)
if !ok {
return t.ErrMalformed
}

if now.IsZero() {
now = t.TimeNow()
}

// Change state of all topics where the user is the owner.
if _, err = tx.Exec("UPDATE topics SET state=?, stateat=? WHERE owner=?", state, now, decoded_uid); err != nil {
return err
}

// Change state of p2p topics with the user (p2p topic's owner is 0)
if _, err = tx.Exec("UPDATE topics LEFT JOIN subscriptions ON topics.name=subscriptions.topic "+
"SET topics.state=?, topics.stateat=? WHERE topics.owner=0 AND subscriptions.user=?",
state, now, decoded_uid); err != nil {
return err
}

return nil
}

// UserUpdate updates user object.
func (a *adapter) UserUpdate(uid t.Uid, update map[string]interface{}) error {
tx, err := a.db.Beginx()
Expand All @@ -970,6 +998,14 @@ func (a *adapter) UserUpdate(uid t.Uid, update map[string]interface{}) error {
return err
}

if state, ok := update["State"]; ok {
now, _ := update["StateAt"].(time.Time)
err = a.topicStateForUser(tx, decoded_uid, now, state)
if err != nil {
return err
}
}

// Tags are also stored in a separate table
if tags := extractTags(update); tags != nil {
// First delete all user tags
Expand Down
19 changes: 8 additions & 11 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,18 +697,15 @@ func (s *Session) login(msg *ClientComMessage) {
return
}

// If authenticator did not check user state, it returns state "undef".
// Then the user state is checked here.
// If authenticator did not check user state, it returns state "undef". If so, check user state here.
if rec.State == types.StateUndefined {
// Check if user's account is suspended.
user, err := store.Users.Get(rec.Uid)
if err == nil {
if user == nil {
err = types.ErrUserNotFound
} else if user.State != types.StateOK {
err = types.ErrPermissionDenied
}
}
rec.State, err = userGetState(rec.Uid)
}
if err == nil && rec.State != types.StateOK {
err = types.ErrPermissionDenied
}

if err != nil {
log.Println("s.login: user state check failed", rec.Uid, err, s.sid)
s.queueOut(decodeStoreError(err, msg.id, "", msg.timestamp, nil))
return
Expand Down
8 changes: 8 additions & 0 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ func (UsersObjMapper) UpdateTags(uid types.Uid, add, remove, reset []string) ([]
return adp.UserUpdateTags(uid, add, remove, reset)
}

// UpdateState changes user's state and state of some topics associated with the user.
func (UsersObjMapper) UpdateState(uid types.Uid, state types.ObjState) error {
update := map[string]interface{}{
"State": state,
"StateAt": types.TimeNow()}
return adp.UserUpdate(uid, update)
}

// GetSubs loads a list of subscriptions for the given user.
// Does not load Public, does not load deleted subscriptions.
func (UsersObjMapper) GetSubs(id types.Uid, opts *types.QueryOpt) ([]types.Subscription, error) {
Expand Down
32 changes: 17 additions & 15 deletions server/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func replyCreateUser(s *Session, msg *ClientComMessage, rec *auth.Rec) {
}

state, err := types.NewObjState(msg.Acc.State)
if err != nil {
if err != nil || state == types.StateUndefined {
log.Println("create user: invalid account state", err, s.sid)
s.queueOut(ErrMalformed(msg.id, "", msg.timestamp))
return
Expand Down Expand Up @@ -490,14 +490,14 @@ func deleteCred(uid types.Uid, authLvl auth.Level, cred *MsgCredClient) ([]strin
}

// Change user state: suspended/normal (ok).
// 1. Disable/enable logins.
// 2. If disabling, evict user's sessions.
// 1. Not needed -- Disable/enable logins (state checked after login).
// 2. If suspending, evict user's sessions. Skip this step if resuming.
// 3. Suspend/activate p2p with the user.
// 4. Suspend/activate grp topics where the user is the owner.
// 5. Update user's DB record.
func changeUserState(s *Session, uid types.Uid, user *types.User, msg *ClientComMessage) (bool, error) {
state, err := types.NewObjState(msg.Acc.State)
if err != nil {
if err != nil || state == types.StateUndefined {
log.Println("replyUpdateUser: invalid account state", s.sid)
return false, types.ErrMalformed
}
Expand All @@ -507,12 +507,6 @@ func changeUserState(s *Session, uid types.Uid, user *types.User, msg *ClientCom
return false, nil
}

// TODO: Disable/enable all authenticators.
authnames := store.GetAuthNames()
for _, name := range authnames {
log.Println("changeUserState: TODO change auth", uid.UserId(), name, state, s.sid)
}

if state != types.StateOK {
// Terminate all sessions.
globals.sessionStore.EvictUser(uid, "")
Expand All @@ -521,12 +515,8 @@ func changeUserState(s *Session, uid types.Uid, user *types.User, msg *ClientCom
// Update state of all loaded in memory user's p2p & grp-owner topics.
globals.hub.meta <- &metaReq{forUser: uid, state: state, sess: s}

// TODO: Suspend/activate p2p with the user.

// TODO: Suspend/activate grp topics where the user is the owner.

user.State = state
err = store.Users.Update(uid, map[string]interface{}{"State": user.State})
err = store.Users.UpdateState(uid, user.State)
return true, err
}

Expand Down Expand Up @@ -612,6 +602,18 @@ func replyDelUser(s *Session, msg *ClientComMessage) {
}
}

// Read user's state from DB.
func userGetState(uid types.Uid) (types.ObjState, error) {
user, err := store.Users.Get(uid)
if err != nil {
return types.StateUndefined, err
}
if user == nil {
return types.StateUndefined, types.ErrUserNotFound
}
return user.State, nil
}

// UserCacheReq contains data which mutates one or more user cache entries.
type UserCacheReq struct {
// Name of the node sending this request in case of cluster. Not set otherwise.
Expand Down

0 comments on commit 6655b76

Please sign in to comment.