Skip to content

Commit

Permalink
Merge branch 'devel' of https://github.com/tinode/chat into devel
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed May 2, 2020
2 parents 6df29db + 3bb86be commit 57b2952
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 53 deletions.
4 changes: 2 additions & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (c *Cluster) Master(msg *ClusterReq, rejected *bool) error {
sess.platf = msg.Sess.Platform

// Dispatch remote message to a local session.
msg.CliMsg.from = msg.OnBehalfOf
msg.CliMsg.asUser = msg.OnBehalfOf
msg.CliMsg.authLvl = msg.AuthLvl
sess.dispatch(msg.CliMsg)
} else {
Expand Down Expand Up @@ -605,7 +605,7 @@ func (c *Cluster) routeToTopic(msg *ClientComMessage, topic string, sess *Sessio

if sess.authLvl == auth.LevelRoot {
// Assign these values only when the sender is root
req.OnBehalfOf = msg.from
req.OnBehalfOf = msg.asUser
req.AuthLvl = msg.authLvl
}

Expand Down
6 changes: 3 additions & 3 deletions server/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ type ClientComMessage struct {
// Un-routable (original) topic name denormalized from XXX.Topic.
topic string
// Sender's UserId as string
from string
asUser string
// Sender's authentication level
authLvl int
// Timestamp when this message was received by the server
Expand Down Expand Up @@ -578,7 +578,7 @@ type ServerComMessage struct {
// timestamp for consistency of timestamps in {ctrl} messages
timestamp time.Time
// User ID of the sender of the original message.
from string
asUser string
// Originating session to send an aknowledgement to. Could be nil.
sess *Session
// Should the packet be sent to the original session? SessionID to skip.
Expand All @@ -595,7 +595,7 @@ func (src *ServerComMessage) copy() *ServerComMessage {
id: src.id,
rcptto: src.rcptto,
timestamp: src.timestamp,
from: src.from,
asUser: src.asUser,
sess: src.sess,
skipSid: src.skipSid,
}
Expand Down
16 changes: 8 additions & 8 deletions server/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea
now := time.Now().UTC().Round(time.Millisecond)

if reason == StopDeleted {
asUid := types.ParseUserId(msg.from)
asUid := types.ParseUserId(msg.asUser)
// Case 1 (unregister and delete)
if t := h.topicGet(topic); t != nil {
// Case 1.1: topic is online
Expand Down Expand Up @@ -389,7 +389,7 @@ func (h *Hub) topicUnreg(sess *Session, topic string, msg *ClientComMessage, rea
} else {
// Case 1.2: topic is offline.

asUid := types.ParseUserId(msg.from)
asUid := types.ParseUserId(msg.asUser)
// Get all subscribers: we need to know how many are left and notify them.
subs, err := store.Topics.GetSubs(topic, nil)
if err != nil {
Expand Down Expand Up @@ -535,7 +535,7 @@ func (h *Hub) stopTopicsForUser(uid types.Uid, reason int, alldone chan<- bool)
func replyOfflineTopicGetDesc(sess *Session, topic string, msg *ClientComMessage) {
now := types.TimeNow()
desc := &MsgTopicDesc{}
asUid := types.ParseUserId(msg.from)
asUid := types.ParseUserId(msg.asUser)

if strings.HasPrefix(topic, "grp") {
stopic, err := store.Topics.Get(topic)
Expand All @@ -552,7 +552,7 @@ func replyOfflineTopicGetDesc(sess *Session, topic string, msg *ClientComMessage
desc.CreatedAt = &stopic.CreatedAt
desc.UpdatedAt = &stopic.UpdatedAt
desc.Public = stopic.Public
if stopic.Owner == msg.from {
if stopic.Owner == msg.asUser {
desc.DefaultAcs = &MsgDefaultAcsMode{
Auth: stopic.Access.Auth.String(),
Anon: stopic.Access.Anon.String()}
Expand Down Expand Up @@ -624,12 +624,12 @@ func replyOfflineTopicGetDesc(sess *Session, topic string, msg *ClientComMessage
func replyOfflineTopicGetSub(sess *Session, topic string, msg *ClientComMessage) {
now := types.TimeNow()

if msg.Get.Sub != nil && msg.Get.Sub.User != "" && msg.Get.Sub.User != msg.from {
if msg.Get.Sub != nil && msg.Get.Sub.User != "" && msg.Get.Sub.User != msg.asUser {
sess.queueOut(ErrPermissionDenied(msg.id, msg.topic, now))
return
}

ssub, err := store.Subs.Get(topic, types.ParseUserId(msg.from))
ssub, err := store.Subs.Get(topic, types.ParseUserId(msg.asUser))
if err != nil {
log.Println("replyOfflineTopicGetSub:", err)
sess.queueOut(decodeStoreError(err, msg.id, msg.topic, now, nil))
Expand Down Expand Up @@ -673,12 +673,12 @@ func replyOfflineTopicSetSub(sess *Session, topic string, msg *ClientComMessage)
return
}

if msg.Set.Sub != nil && msg.Set.Sub.User != "" && msg.Set.Sub.User != msg.from {
if msg.Set.Sub != nil && msg.Set.Sub.User != "" && msg.Set.Sub.User != msg.asUser {
sess.queueOut(ErrPermissionDenied(msg.id, msg.topic, now))
return
}

asUid := types.ParseUserId(msg.from)
asUid := types.ParseUserId(msg.asUser)

sub, err := store.Subs.Get(topic, asUid)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions server/init_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func initTopicP2P(t *Topic, sreg *sessionJoin) error {

// Fetching records for both users.
// Requester.
userID1 := types.ParseUserId(sreg.pkt.from)
userID1 := types.ParseUserId(sreg.pkt.asUser)
// The other user.
userID2 := types.ParseUserId(t.xoriginal)
// User index: u1 - requester, u2 - responder, the other user
Expand Down Expand Up @@ -463,7 +463,7 @@ func initTopicNewGrp(t *Topic, sreg *sessionJoin) error {
t.cat = types.TopicCatGrp

// Generic topics have parameters stored in the topic object
t.owner = types.ParseUserId(sreg.pkt.from)
t.owner = types.ParseUserId(sreg.pkt.asUser)

t.accessAuth = getDefaultAccess(t.cat, true)
t.accessAnon = getDefaultAccess(t.cat, false)
Expand Down
4 changes: 2 additions & 2 deletions server/pbconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func pbCliSerialize(msg *ClientComMessage) *pbx.ClientMsg {
return nil
}

pkt.OnBehalfOf = msg.from
pkt.OnBehalfOf = msg.asUser
pkt.AuthLevel = pbx.AuthLevel(msg.authLvl)

return &pkt
Expand Down Expand Up @@ -409,7 +409,7 @@ func pbCliDeserialize(pkt *pbx.ClientMsg) *ClientComMessage {
}
}

msg.from = pkt.GetOnBehalfOf()
msg.asUser = pkt.GetOnBehalfOf()
msg.authLvl = int(pkt.GetAuthLevel())

return &msg
Expand Down
32 changes: 16 additions & 16 deletions server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,17 +275,17 @@ func (s *Session) dispatch(msg *ClientComMessage) {
s.lastAction = types.TimeNow()
msg.timestamp = s.lastAction

if msg.from == "" {
msg.from = s.uid.UserId()
if msg.asUser == "" {
msg.asUser = s.uid.UserId()
msg.authLvl = int(s.authLvl)
} else if s.authLvl != auth.LevelRoot {
// Only root user can set non-default msg.from && msg.authLvl values.
s.queueOut(ErrPermissionDenied("", "", msg.timestamp))
log.Println("s.dispatch: non-root asigned msg.from", s.sid)
return
} else if fromUid := types.ParseUserId(msg.from); fromUid.IsZero() {
} else if fromUid := types.ParseUserId(msg.asUser); fromUid.IsZero() {
s.queueOut(ErrMalformed("", "", msg.timestamp))
log.Println("s.dispatch: malformed msg.from: ", msg.from, s.sid)
log.Println("s.dispatch: malformed msg.from: ", msg.asUser, s.sid)
return
}

Expand Down Expand Up @@ -317,7 +317,7 @@ func (s *Session) dispatch(msg *ClientComMessage) {
// Check if user is logged in
checkUser := func(m *ClientComMessage, handler func(*ClientComMessage)) func(*ClientComMessage) {
return func(m *ClientComMessage) {
if msg.from == "" {
if msg.asUser == "" {
log.Println("s.dispatch: authentication required", s.sid)
s.queueOut(ErrAuthRequired(m.id, m.topic, msg.timestamp))
return
Expand Down Expand Up @@ -395,8 +395,8 @@ func (s *Session) dispatch(msg *ClientComMessage) {
handler(msg)

// Notify 'me' topic that this session is currently active
if uaRefresh && msg.from != "" && s.userAgent != "" {
if sub := s.getSub(msg.from); sub != nil {
if uaRefresh && msg.asUser != "" && s.userAgent != "" {
if sub := s.getSub(msg.asUser); sub != nil {
// The chan is buffered. If the buffer is exhaused, the session will wait for 'me' to become available
sub.uaChange <- s.userAgent
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func (s *Session) leave(msg *ClientComMessage) {
// Unlink from topic, topic will send a reply.
s.delSub(expanded)
sub.done <- &sessionLeave{
userId: types.ParseUserId(msg.from),
userId: types.ParseUserId(msg.asUser),
topic: msg.topic,
sess: s,
unsub: msg.Leave.Unsub,
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s *Session) publish(msg *ClientComMessage) {
}

// Add "sender" header if the message is sent on behalf of another user.
if msg.from != s.uid.UserId() {
if msg.asUser != s.uid.UserId() {
if msg.Pub.Head == nil {
msg.Pub.Head = make(map[string]interface{})
}
Expand All @@ -518,7 +518,7 @@ func (s *Session) publish(msg *ClientComMessage) {

data := &ServerComMessage{Data: &MsgServerData{
Topic: msg.topic,
From: msg.from,
From: msg.asUser,
Timestamp: msg.timestamp,
Head: msg.Pub.Head,
Content: msg.Pub.Content},
Expand All @@ -527,7 +527,7 @@ func (s *Session) publish(msg *ClientComMessage) {
sess: s,
id: msg.id,
timestamp: msg.timestamp,
from: msg.from}
asUser: msg.asUser}
if msg.Pub.NoEcho {
data.skipSid = s.sid
}
Expand Down Expand Up @@ -980,7 +980,7 @@ func (s *Session) del(msg *ClientComMessage) {
// Not reporting any errors
func (s *Session) note(msg *ClientComMessage) {

if s.ver == 0 || msg.from == "" {
if s.ver == 0 || msg.asUser == "" {
// Silently ignore the message: have not received {hi} or don't know who sent the message.
return
}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func (s *Session) note(msg *ClientComMessage) {
// Pings can be sent to subscribed topics only
sub.broadcast <- &ServerComMessage{Info: &MsgServerInfo{
Topic: msg.topic,
From: msg.from,
From: msg.asUser,
What: msg.Note.What,
SeqId: msg.Note.SeqId,
}, rcptto: expanded, timestamp: msg.timestamp, skipSid: s.sid}
Expand All @@ -1034,12 +1034,12 @@ func (s *Session) expandTopicName(msg *ClientComMessage) (string, *ServerComMess
// Expanded name of the topic to route to i.e. rcptto: or s.subs[routeTo]
var routeTo string
if msg.topic == "me" {
routeTo = msg.from
routeTo = msg.asUser
} else if msg.topic == "fnd" {
routeTo = types.ParseUserId(msg.from).FndName()
routeTo = types.ParseUserId(msg.asUser).FndName()
} else if strings.HasPrefix(msg.topic, "usr") {
// p2p topic
uid1 := types.ParseUserId(msg.from)
uid1 := types.ParseUserId(msg.asUser)
uid2 := types.ParseUserId(msg.topic)
if uid2.IsZero() {
// Ensure the user id is valid
Expand Down
36 changes: 18 additions & 18 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (t *Topic) run(hub *Hub) {
// Request to add a connection to this topic

if t.isInactive() {
asUid := types.ParseUserId(sreg.pkt.from)
asUid := types.ParseUserId(sreg.pkt.asUser)
sreg.sess.queueOut(ErrLocked(sreg.pkt.id, t.original(asUid), types.TimeNow()))
} else {
// The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die
Expand Down Expand Up @@ -282,7 +282,7 @@ func (t *Topic) run(hub *Hub) {
// Content message intended for broadcasting to recipients

var pushRcpt *push.Receipt
asUid := types.ParseUserId(msg.from)
asUid := types.ParseUserId(msg.asUser)
if msg.Data != nil {
if t.isInactive() {
msg.sess.queueOut(ErrLocked(msg.id, t.original(asUid), msg.timestamp))
Expand All @@ -293,8 +293,8 @@ func (t *Topic) run(hub *Hub) {
continue
}

from := types.ParseUserId(msg.Data.From)
userData, userFound := t.perUser[from]
asUser := types.ParseUserId(msg.Data.From)
userData, userFound := t.perUser[asUser]
// Anyone is allowed to post to 'sys' topic.
if t.cat != types.TopicCatSys {
// If it's not 'sys' check write permission.
Expand All @@ -309,7 +309,7 @@ func (t *Topic) run(hub *Hub) {
ObjHeader: types.ObjHeader{CreatedAt: msg.Data.Timestamp},
SeqId: t.lastID + 1,
Topic: t.name,
From: from.String(),
From: asUser.String(),
Head: msg.Data.Head,
Content: msg.Data.Content}, (userData.modeGiven & userData.modeWant).IsReader()); err != nil {

Expand All @@ -325,15 +325,15 @@ func (t *Topic) run(hub *Hub) {
if userFound {
userData.readID = t.lastID
userData.readID = t.lastID
t.perUser[from] = userData
t.perUser[asUser] = userData
}
if msg.id != "" {
reply := NoErrAccepted(msg.id, t.original(asUid), msg.timestamp)
reply.Ctrl.Params = map[string]int{"seq": t.lastID}
msg.sess.queueOut(reply)
}

pushRcpt = t.pushForData(from, msg.Data)
pushRcpt = t.pushForData(asUser, msg.Data)

// Message sent: notify offline 'R' subscrbers on 'me'
t.presSubsOffline("msg", &presParams{seqID: t.lastID, actor: msg.Data.From},
Expand Down Expand Up @@ -367,8 +367,8 @@ func (t *Topic) run(hub *Hub) {
continue
}

from := types.ParseUserId(msg.Info.From)
pud := t.perUser[from]
asUser := types.ParseUserId(msg.Info.From)
pud := t.perUser[asUser]

// Filter out "kp" from users with no 'W' permission (or people without a subscription)
if msg.Info.What == "kp" && (!(pud.modeGiven & pud.modeWant).IsWriter() || t.isReadOnly()) {
Expand Down Expand Up @@ -406,7 +406,7 @@ func (t *Topic) run(hub *Hub) {
recv = pud.recvID
}

if err := store.Subs.Update(t.name, from,
if err := store.Subs.Update(t.name, asUser,
map[string]interface{}{
"RecvSeqId": pud.recvID,
"ReadSeqId": pud.readID},
Expand All @@ -417,12 +417,12 @@ func (t *Topic) run(hub *Hub) {
}

// Read/recv updated: notify user's other sessions of the change
t.presPubMessageCount(from, recv, read, msg.skipSid)
t.presPubMessageCount(asUser, recv, read, msg.skipSid)

// Update cached count of unread messages
usersUpdateUnread(from, unread, true)
usersUpdateUnread(asUser, unread, true)

t.perUser[from] = pud
t.perUser[asUser] = pud
}
}

Expand Down Expand Up @@ -516,7 +516,7 @@ func (t *Topic) run(hub *Hub) {

case meta := <-t.meta:
// Request to get/set topic metadata
asUid := types.ParseUserId(meta.pkt.from)
asUid := types.ParseUserId(meta.pkt.asUser)
authLevel := auth.Level(meta.pkt.authLvl)
switch {
case meta.pkt.Get != nil:
Expand Down Expand Up @@ -625,7 +625,7 @@ func (t *Topic) run(hub *Hub) {
pssd.ref = nil
t.sessions[sreg.sess] = pssd
}
t.sendSubNotifications(types.ParseUserId(sreg.pkt.from), sreg)
t.sendSubNotifications(types.ParseUserId(sreg.pkt.asUser), sreg)
}

case <-uaTimer.C:
Expand Down Expand Up @@ -688,7 +688,7 @@ func (t *Topic) run(hub *Hub) {

// Session subscribed to a topic, created == true if topic was just created and {pres} needs to be announced
func (t *Topic) handleSubscription(h *Hub, sreg *sessionJoin) error {
asUid := types.ParseUserId(sreg.pkt.from)
asUid := types.ParseUserId(sreg.pkt.asUser)
authLevel := auth.Level(sreg.pkt.authLvl)

msgsub := sreg.pkt.Sub
Expand Down Expand Up @@ -855,7 +855,7 @@ func (t *Topic) subCommonReply(h *Hub, sreg *sessionJoin) error {
}

msgsub := sreg.pkt.Sub
asUid := types.ParseUserId(sreg.pkt.from)
asUid := types.ParseUserId(sreg.pkt.asUser)
asLvl := auth.Level(sreg.pkt.authLvl)
toriginal := t.original(asUid)

Expand Down Expand Up @@ -1876,7 +1876,7 @@ func (t *Topic) replyGetSub(sess *Session, asUid types.Uid, authLevel auth.Level
func (t *Topic) replySetSub(h *Hub, sess *Session, pkt *ClientComMessage) error {
now := types.TimeNow()

asUid := types.ParseUserId(pkt.from)
asUid := types.ParseUserId(pkt.asUser)
asLvl := auth.Level(pkt.authLvl)
set := pkt.Set
toriginal := t.original(asUid)
Expand Down
Loading

0 comments on commit 57b2952

Please sign in to comment.