Skip to content

Commit

Permalink
moar changes. still does not compile
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Nov 12, 2017
1 parent 6e1cb03 commit 619d2bc
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 214 deletions.
293 changes: 146 additions & 147 deletions pbx/model.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pbx/model.proto
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ message ServerPres {
What what = 3;
string user_agent = 4;
int32 seq_id = 5;
repeated int32 seq_list = 6;
repeated DelQuery del_seq = 6;
string target_user_id = 7;
string actor_user_id = 8;
AccessMode acs = 9;
Expand Down
8 changes: 1 addition & 7 deletions server/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ type MsgFindQuery struct {
Tags []string `json:"tags"`
}

// Range of ids from low to hi inclusive
type MsgSeqRange struct {
Low int `json:"low,omitempty"`
Hi int `json:"hi,omitempty"`
}

// Either an individual ID or a randge of deleted IDs
type MsgDelQuery struct {
SeqId int `json:"seq,omitempty"`
Expand Down Expand Up @@ -404,7 +398,7 @@ type MsgServerPres struct {
What string `json:"what"`
UserAgent string `json:"ua,omitempty"`
SeqId int `json:"seq,omitempty"`
SeqList []int `json:"list,omitempty"`
DelSeq []MsgDelQuery `json:"delseq,omitempty"`
AcsTarget string `json:"tgt,omitempty"`
AcsActor string `json:"act,omitempty"`
Acs *MsgAccessMode `json:"acs,omitempty"`
Expand Down
4 changes: 2 additions & 2 deletions server/pbconverter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func pb_serv_serialize(msg *ServerComMessage) *pbx.ServerMsg {
What: what,
UserAgent: msg.Pres.UserAgent,
SeqId: int32(msg.Pres.SeqId),
SeqList: intSliceToInt32(msg.Pres.SeqList),
DelSeq: pb_DelQuery_serialize(msg.Pres.DelSeq),
TargetUserId: msg.Pres.AcsTarget,
ActorUserId: msg.Pres.AcsActor,
Acs: pb_AccessMode_serialize(msg.Pres.Acs)}}
Expand Down Expand Up @@ -143,7 +143,7 @@ func pb_serv_deserialize(pkt *pbx.ServerMsg) *ServerComMessage {
What: what,
UserAgent: pres.GetUserAgent(),
SeqId: int(pres.GetSeqId()),
SeqList: int32SliceToInt(pres.GetSeqList()),
DelSeq: pb_DelQuery_deserialize(pres.GetDelSeq()),
AcsTarget: pres.GetTargetUserId(),
AcsActor: pres.GetActorUserId(),
Acs: pb_AccessMode_deserialize(pres.GetAcs()),
Expand Down
18 changes: 9 additions & 9 deletions server/pres.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
type PresParams struct {
userAgent string
seqId int
seqList []int
delSeq []MsgDelQuery

// Uid who performed the action
actor string
Expand Down Expand Up @@ -202,7 +202,7 @@ func (t *Topic) presSubsOnline(what, src string, params *PresParams,
globals.hub.route <- &ServerComMessage{
Pres: &MsgServerPres{Topic: t.x_original, What: what, Src: src,
Acs: params.packAcs(), AcsActor: actor, AcsTarget: target,
SeqId: params.seqId, SeqList: params.seqList, filter: int(filter), singleUser: singleUser},
SeqId: params.seqId, DelSeq: params.delSeq, filter: int(filter), singleUser: singleUser},
rcptto: t.name, skipSid: skipSid}

// log.Printf("Pres K.2, L.3, W.2: topic'%s' what='%s', who='%s', acs='w:%s/g:%s'", t.name, what,
Expand Down Expand Up @@ -268,7 +268,7 @@ func (t *Topic) presSubsOffline(what string, params *PresParams, filter types.Ac
globals.hub.route <- &ServerComMessage{
Pres: &MsgServerPres{Topic: "me", What: what, Src: t.original(uid),
Acs: params.packAcs(), AcsActor: actor, AcsTarget: target,
SeqId: params.seqId, SeqList: params.seqList,
SeqId: params.seqId, DelSeq: params.delSeq,
skipTopic: skipTopic},
rcptto: user, skipSid: skipSid}
}
Expand Down Expand Up @@ -305,7 +305,7 @@ func presSubsOfflineOffline(topic string, cat types.TopicCat, subs []types.Subsc
globals.hub.route <- &ServerComMessage{
Pres: &MsgServerPres{Topic: "me", What: what, Src: original,
Acs: params.packAcs(), AcsActor: actor, AcsTarget: target,
SeqId: params.seqId, SeqList: params.seqList},
SeqId: params.seqId, DelSeq: params.delSeq},
rcptto: user, skipSid: skipSid}
}
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func (t *Topic) presSingleUserOffline(uid types.Uid, what string, params *PresPa

globals.hub.route <- &ServerComMessage{
Pres: &MsgServerPres{Topic: "me", What: what,
Src: t.original(uid), SeqId: params.seqId, SeqList: params.seqList,
Src: t.original(uid), SeqId: params.seqId, DelSeq: params.delSeq,
Acs: params.packAcs(), AcsActor: actor, AcsTarget: target, UserAgent: params.userAgent,
wantReply: strings.HasPrefix(what, "?unkn"), skipTopic: skipTopic},
rcptto: user, skipSid: skipSid}
Expand All @@ -362,7 +362,7 @@ func presSingleUserOfflineOffline(uid types.Uid, original string, what string,

globals.hub.route <- &ServerComMessage{
Pres: &MsgServerPres{Topic: "me", What: what,
Src: original, SeqId: params.seqId, SeqList: params.seqList,
Src: original, SeqId: params.seqId, DelSeq: params.delSeq,
Acs: params.packAcs(), AcsActor: actor, AcsTarget: target},
rcptto: uid.UserId(), skipSid: skipSid}
}
Expand Down Expand Up @@ -391,15 +391,15 @@ func (t *Topic) presPubMessageCount(uid types.Uid, recv, read int, skip string)

// Let other sessions of a given user know that what messages are now deleted
// Cases V.1, V.2
func (t *Topic) presPubMessageDelete(uid types.Uid, list []int, clear int, skip string) {
if clear > 0 || (len(list) > 0 && list[0] > 0) {
func (t *Topic) presPubMessageDelete(uid types.Uid, list []MsgDelQuery, skip string) {
if len(list) > 0 {
// This check is only needed for V.1, but it does not hurt V.2. Let's do it here for both.
pud, _ := t.perUser[uid]
if !(pud.modeGiven & pud.modeWant).IsPresencer() {
return
}

params := &PresParams{seqId: clear, seqList: list}
params := &PresParams{delSeq: list}

// Case V.2
user := uid.UserId()
Expand Down
1 change: 0 additions & 1 deletion server/store/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ type Adapter interface {
// Messages
MessageSave(msg *t.Message) error
MessageGetAll(topic string, forUser t.Uid, opts *t.BrowseOpt) ([]t.Message, error)
MessageDeleteAll(topic string, before int) error
MessageDeleteList(topic string, forUser t.Uid, hard bool, list []int) error

// Devices (for push notifications)
Expand Down
14 changes: 4 additions & 10 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (TopicsObjMapper) Delete(topic string) error {
if err := adaptr.SubsDelForTopic(topic); err != nil {
return err
}
if err := adaptr.MessageDeleteAll(topic, -1); err != nil {
if err := adaptr.MessageDeleteList(topic, types.ZeroUid, true, nil); err != nil {
return err
}

Expand Down Expand Up @@ -354,8 +354,9 @@ func (MessagesObjMapper) Save(msg *types.Message) error {
return adaptr.MessageSave(msg)
}

// Delete messages. Hard-delete if hard == tru, otherwise a soft-delete
func (MessagesObjMapper) Delete(topic string, forUser types.Uid, hard bool, cleared int) (err error) {
func (MessagesObjMapper) DeleteList(topic string, delId int, forUser types.Uid, hard bool, list []int) (err error) {
err = adaptr.MessageDeleteList(topic, forUser, hard, list)

if hard {
err = adaptr.MessageDeleteAll(topic, cleared)
if err != nil {
Expand All @@ -370,13 +371,6 @@ func (MessagesObjMapper) Delete(topic string, forUser types.Uid, hard bool, clea
update := map[string]interface{}{"ClearId": cleared}
err = adaptr.SubsUpdate(topic, forUser, update)
}

return
}

func (MessagesObjMapper) DeleteList(topic string, forUser types.Uid, hard bool, list []int) (err error) {
err = adaptr.MessageDeleteList(topic, forUser, hard, list)

return err
}

Expand Down
94 changes: 57 additions & 37 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
// "encoding/json"
"errors"
"log"
"sort"
"sync/atomic"
"time"

Expand All @@ -24,7 +25,7 @@ import (
const UA_TIMER_DELAY = time.Second * 5

// Maximum number of SeqIds to pass in a list
const MAX_SEQ_COUNT = 128
const MAX_SEQ_COUNT = 256

// Topic: an isolated communication channel
type Topic struct {
Expand Down Expand Up @@ -1615,7 +1616,7 @@ func (t *Topic) replyDelMsg(sess *Session, del *MsgClientDel) error {
var err error
var filteredList []int
if del.DelSeq == nil || len(del.DelSeq) == 0 {
err = error.New("del.msg: not IDs to delete")
err = errors.New("del.msg: not IDs to delete")
} else {
remains := MAX_SEQ_COUNT
for _, dq := range del.DelSeq {
Expand Down Expand Up @@ -1645,6 +1646,31 @@ func (t *Topic) replyDelMsg(sess *Session, del *MsgClientDel) error {
}
}

sort.Ints(filteredList)
ll := len(filteredList)
if ll > 1 {
p := 0
// Remove zeros
for i := 0; i < ll && filteredList[i] == 0; i++ {
p++
}
if p > 0 {
filteredList = filteredList[p:]
p = 0
}
// Remove duplicates
for i := 1; i < ll; i++ {
if filteredList[p] == filteredList[i] {
continue
}
p++
if p < i {
filteredList[p] = filteredList[i]
}
}
filteredList = filteredList[:p+1]
}

if len(filteredList) == 0 {
err = errors.New("del.msg: no valid entries in list")
}
Expand All @@ -1669,53 +1695,51 @@ func (t *Topic) replyDelMsg(sess *Session, del *MsgClientDel) error {
del.Hard = false
}

if del.Before > 0 {
// Make sure user has not deleted the messages already
if (del.Before <= t.clearId) || (!del.Hard && del.Before <= pud.clearId) {
sess.queueOut(InfoNoAction(del.Id, t.original(sess.uid), now))
return nil
}

err = store.Messages.Delete(t.name, sess.uid, del.Hard, del.Before)
} else {
// del.List != nil

err = store.Messages.DeleteList(t.name, sess.uid, del.Hard, filteredList)
}

err = store.Messages.DeleteList(t.name, t.delId+1, sess.uid, del.Hard, filteredList)
if err != nil {
sess.queueOut(ErrUnknown(del.Id, t.original(sess.uid), now))
return err
}

var params *PresParams
if del.Before > 0 {
if del.Hard {
t.clearId = del.Before
params = &PresParams{seqId: del.Before, actor: sess.uid.UserId()}
// Increment Delete transaction ID
t.delId++

// Convert a list of IDs into IDs and ranges
ranges := []MsgDelQuery{{SeqId: filteredList[0]}}
for r, i := 0, 1; i < len(filteredList); i++ {
if ranges[r].SeqId+1 == filteredList[i] {
// Convert single ID into a range of IDs
ranges[r].LowId = ranges[r].SeqId
ranges[r].SeqId = 0
ranges[r].HiId = filteredList[i]
} else if ranges[r].HiId+1 == filteredList[i] {
// Extend current range
ranges[r].HiId++
} else {
pud.clearId = del.Before
if pud.readId < pud.clearId {
pud.readId = pud.clearId
}
if pud.recvId < pud.readId {
pud.recvId = pud.readId
}
t.perUser[sess.uid] = pud
// Start new range
r++
ranges = append(ranges, MsgDelQuery{SeqId: filteredList[i]})
}
} else if del.Hard {
params = &PresParams{seqList: filteredList, actor: sess.uid.UserId()}
}

if del.Hard {
log.Println("hard delete")
for uid, pud := range t.perUser {
pud.delId = t.delId
t.perUser[uid] = pud
}
// Broadcast the change to all, online and offline, exclude the session making the change.
params := &PresParams{delSeq: ranges, actor: sess.uid.UserId()}
t.presSubsOnline("del", "", params, types.ModeRead, sess.sid, "")
t.presSubsOffline("del", params, types.ModeRead, sess.sid, true)
} else {
log.Println("soft delete")
pud := t.perUser[sess.uid]
pud.delId = t.delId
t.perUser[sess.uid] = pud

// Notify user's other sessions
t.presPubMessageDelete(sess.uid, filteredList, del.Before, sess.sid)
t.presPubMessageDelete(sess.uid, ranges, sess.sid)
}

sess.queueOut(NoErr(del.Id, t.original(sess.uid), now))
Expand Down Expand Up @@ -2014,7 +2038,7 @@ func getDefaultAccess(cat types.TopicCat, auth bool) types.AccessMode {
}

// Takes get.data parameters and ClearID, returns database query parameters
func msgOpts2storeOpts(req *MsgBrowseOpts, clearId int) *types.BrowseOpt {
func msgOpts2storeOpts(req *MsgBrowseOpts, delId int) *types.BrowseOpt {
var opts *types.BrowseOpt
if req != nil || clearId > 0 {
opts = &types.BrowseOpt{}
Expand All @@ -2023,10 +2047,6 @@ func msgOpts2storeOpts(req *MsgBrowseOpts, clearId int) *types.BrowseOpt {
if req.SinceId != 0 || req.BeforeId != 0 {
opts.Since = req.SinceId
opts.Before = req.BeforeId
} else if req.SinceTs != nil || req.BeforeTs != nil {
opts.ByTime = true
opts.After = req.SinceTs
opts.Until = req.BeforeTs
}
}
if clearId > opts.Since {
Expand Down

0 comments on commit 619d2bc

Please sign in to comment.