Skip to content

Commit

Permalink
adding last message timestamp to subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Apr 20, 2018
1 parent 5443202 commit d1be998
Show file tree
Hide file tree
Showing 16 changed files with 80 additions and 64 deletions.
3 changes: 3 additions & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ meta: {
updated: "2015-10-24T10:26:09.716Z", // timestamp of the last change in the
// subscription, present only for
// requester's own subscriptions
touched: "2017-11-02T09:13:55.530Z", // timestamp of the last message in the
// topic (may also include other events
// in the future, such as new subscribers)
acs: { // user's access permissions
want: "JRWP", // string, requested access permission, present for user's own
// subscriptions and when the requester is topic's manager or owner
Expand Down
30 changes: 20 additions & 10 deletions build-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,21 @@ goarc=( amd64 )
# Supported database tags
dbtags=( mysql rethinkdb )

pushd ${GOHOME}chat > /dev/null
for line in $@; do
eval "$line"
done

version=${tag#?}

if [ -z "$version" ]; then
# Get last git tag as release version. Tag looks like 'v.1.2.3', so strip 'v'.
version=`git tag | tail -1`
version=${version#?}
fi

GOSRC=${GOPATH}/src/github.com/tinode

# Get last git tag as release version. Tag looks like 'v.1.2.3', so strip 'v'.
version=`git tag | tail -1`
version=${version#?}
pushd ${GOSRC}/chat > /dev/null

# Prepare directory for the new release
rm -fR ./releases/${version}
Expand Down Expand Up @@ -84,7 +94,7 @@ do
# Remove possibly existing archive.
rm -f ./releases/${version}/tinode-${dbtag}."${plat}-${arc}".tar.gz
# Generate a new one
tar -C ${GOHOME}chat/releases/tmp -zcf ./releases/${version}/tinode-${dbtag}."${plat}-${arc}".tar.gz .
tar -C ${GOSRC}/chat/releases/tmp -zcf ./releases/${version}/tinode-${dbtag}."${plat}-${arc}".tar.gz .
fi
done
done
Expand All @@ -96,12 +106,12 @@ echo "Building chatbot..."
rm -fR ./releases/tmp
mkdir -p ./releases/tmp

cp ${GOHOME}chat/chatbot/chatbot.py ./releases/tmp
cp ${GOHOME}chat/chatbot/quotes.txt ./releases/tmp
cp ${GOHOME}chat/pbx/model_pb2.py ./releases/tmp
cp ${GOHOME}chat/pbx/model_pb2_grpc.py ./releases/tmp
cp ${GOSRC}/chat/chatbot/chatbot.py ./releases/tmp
cp ${GOSRC}/chat/chatbot/quotes.txt ./releases/tmp
cp ${GOSRC}/chat/pbx/model_pb2.py ./releases/tmp
cp ${GOSRC}/chat/pbx/model_pb2_grpc.py ./releases/tmp

tar -C ${GOHOME}chat/releases/tmp -zcf ./releases/${version}/chatbot.tar.gz .
tar -C ${GOSRC}/chat/releases/tmp -zcf ./releases/${version}/chatbot.tar.gz .
pushd ./releases/tmp > /dev/null
zip -q -r ../${version}/chatbot.zip ./*
popd > /dev/null
Expand Down
1 change: 0 additions & 1 deletion docker/tinode/config.template
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"store_config": {
"worker_id": 1,
"uid_key": "$UID_ENCRYPTION_KEY",
"use_adapter": "$TARGET_DB",
"adapters": {
"mysql": {
"database": "tinode",
Expand Down
7 changes: 6 additions & 1 deletion server/datamodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,9 @@ type MsgAccessMode struct {
type MsgTopicDesc struct {
CreatedAt *time.Time `json:"created,omitempty"`
UpdatedAt *time.Time `json:"updated,omitempty"`
// Timestamp of the last message
TouchedAt *time.Time `json:"touched,omitempty"`

// When a group topic is created, it's given a temporary name by the client.
// Then this name changes. Report the original name here.
TempName string `json:"tmpname,omitempty"`
Expand Down Expand Up @@ -354,11 +357,13 @@ type MsgTopicSub struct {
// Uid of the subscribed user
User string `json:"user,omitempty"`

// The following sections maks sense only in context of getting
// The following sections makes sense only in context of getting
// user's own subscriptions ('me' topic response)

// Topic name of this subscription
Topic string `json:"topic,omitempty"`
// Timestamp of the last message in the topic.
TouchedAt *time.Time `json:"touched,omitempty"`
// ID of the last {data} message in a topic
SeqId int `json:"seq,omitempty"`
// Id of the latest Delete operation
Expand Down
7 changes: 4 additions & 3 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ func (a *adapter) CreateDb(reset bool) error {
createdat DATETIME(3) NOT NULL,
updatedat DATETIME(3) NOT NULL,
deletedat DATETIME(3),
touchedat DATETIME(3),
name CHAR(25) NOT NULL,
usebt INT DEFAULT 0,
access JSON,
Expand Down Expand Up @@ -794,7 +795,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool) ([]t.Subscription,
if len(topq) > 0 {
// Fetch grp & p2p topics
q, _, _ := sqlx.In(
"SELECT createdat,updatedat,deletedat,name AS id,access,seqid,delid,public,tags "+
"SELECT createdat,updatedat,deletedat,touchedat,name AS id,access,seqid,delid,public,tags "+
"FROM topics WHERE name IN (?)", topq)
rows, err = a.db.Queryx(q, topq...)
if err != nil {
Expand All @@ -809,8 +810,8 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool) ([]t.Subscription,

sub = join[top.Id]
sub.ObjHeader.MergeTimes(&top.ObjHeader)
sub.SetTouchedAt(top.TouchedAt)
sub.SetSeqId(top.SeqId)
// sub.SetDelId(top.DelId)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
// all done with a grp topic
sub.SetPublic(fromJSON(top.Public))
Expand Down Expand Up @@ -919,7 +920,7 @@ func (a *adapter) TopicDelete(topic string) error {
}

func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {
_, err := a.db.Exec("UPDATE topics SET seqid=? WHERE name=?", msg.SeqId, topic)
_, err := a.db.Exec("UPDATE topics SET seqid=?,touchedat=? WHERE name=?", msg.SeqId, msg.CreatedAt, topic)

return err
}
Expand Down
1 change: 1 addition & 0 deletions server/db/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ CREATE TABLE topics(
createdat DATETIME(3) NOT NULL,
updatedat DATETIME(3) NOT NULL,
deletedat DATETIME(3),
touchedat DATETIME(3),
name CHAR(25) NOT NULL,
usebt INT DEFAULT 0,
access JSON,
Expand Down
7 changes: 4 additions & 3 deletions server/db/rethinkdb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (a *adapter) TopicsForUser(uid t.Uid, keepDeleted bool) ([]t.Subscription,
sub = join[top.Id]
sub.ObjHeader.MergeTimes(&top.ObjHeader)
sub.SetSeqId(top.SeqId)
// sub.SetDelId(top.DelId)
sub.SetTouchedAt(top.TouchedAt)
if t.GetTopicCat(sub.Topic) == t.TopicCatGrp {
// all done with a grp topic
sub.SetPublic(top.Public)
Expand Down Expand Up @@ -673,8 +673,9 @@ func (a *adapter) TopicDelete(topic string) error {
func (a *adapter) TopicUpdateOnMessage(topic string, msg *t.Message) error {

update := struct {
SeqId int
}{msg.SeqId}
SeqId int
TouchedAt time.Time
}{msg.SeqId, msg.CreatedAt}

// FIXME(gene): remove 'me' update; no longer relevant
var err error
Expand Down
8 changes: 2 additions & 6 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ const (
// -ldflags "-X main.buildstamp=`date -u '+%Y%m%dT%H:%M:%SZ'`"
var buildstamp = "undefined"

// Database adapter that this binary was built for. Defined at compile time.
var builtfordb = ""

// CredValidator holds additional config params for a credential validator.
type credValidator struct {
// AuthLevel(s) which require this validator.
Expand Down Expand Up @@ -172,10 +169,9 @@ type configType struct {

func main() {
log.Printf("Server 'v%s:%s:%s'; pid %d; started with %d process(es)", currentVersion,
buildstamp, builtfordb, os.Getpid(), runtime.GOMAXPROCS(runtime.NumCPU()))
buildstamp, store.GetAdapterName(), os.Getpid(), runtime.GOMAXPROCS(runtime.NumCPU()))

var configfile = flag.String("config", "./tinode.conf", "Path to config file.")
var useAdapter = flag.String("store_use_adapter", builtfordb, "Override default database adapter")
// Path to static content.
var staticPath = flag.String("static_data", "", "Path to /static data for the server.")
var listenOn = flag.String("listen", "", "Override address and port to listen on for HTTP(S) clients.")
Expand All @@ -201,7 +197,7 @@ func main() {
// Cluster won't be started here yet.
workerId := clusterInit(config.Cluster, clusterSelf)

var err = store.Open(workerId, *useAdapter, string(config.Store))
var err = store.Open(workerId, string(config.Store))
if err != nil {
log.Fatal("Failed to connect to DB:", err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (s *Session) hello(msg *ClientComMessage) {
s.queueOut(ErrVersionNotSupported(msg.Hi.Id, "", msg.timestamp))
return
}
params = map[string]interface{}{"ver": currentVersion, "build": builtfordb + ":" + buildstamp}
params = map[string]interface{}{"ver": currentVersion, "build": store.GetAdapterName() + ":" + buildstamp}

} else if msg.Hi.Version == "" || parseVersion(msg.Hi.Version) == s.ver {
// Save changed device ID or Lang.
Expand Down
43 changes: 19 additions & 24 deletions server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,20 @@ var adp adapter.Adapter
var uGen types.UidGenerator

type configType struct {
// Name of the adapter to use.
UseAdapter string `json:"use_adapter"`
// 16-byte key for XTEA. Used to initialize types.UidGenerator
UidKey []byte `json:"uid_key"`
// Configurations for individual adapters.
Adapters map[string]json.RawMessage `json:"adapters"`
}

func openAdapter(workerId int, useAdapter, jsonconf string) error {
func openAdapter(workerId int, jsonconf string) error {
var config configType
if err := json.Unmarshal([]byte(jsonconf), &config); err != nil {
return errors.New("store: failed to parse config: " + err.Error() + "(" + jsonconf + ")")
}

if useAdapter == "" {
useAdapter = config.UseAdapter
}
adp = dbAdapters[useAdapter]
if adp == nil {
return errors.New("store: attept to Open an unknown adapter '" + useAdapter + "'")
return errors.New("store: database adapter is missing")
}

if adp.IsOpen() {
Expand All @@ -55,7 +49,7 @@ func openAdapter(workerId int, useAdapter, jsonconf string) error {

var adapter_config string
if config.Adapters != nil {
adapter_config = string(config.Adapters[useAdapter])
adapter_config = string(config.Adapters[adp.GetName()])
}

return adp.Open(adapter_config)
Expand All @@ -64,8 +58,8 @@ func openAdapter(workerId int, useAdapter, jsonconf string) error {
// Open initializes the persistence system. Adapter holds a connection pool for a database instance.
// name - name of the adapter rquested in the config file
// jsonconf - configuration string
func Open(workerId int, useAdapter, jsonconf string) error {
if err := openAdapter(workerId, useAdapter, jsonconf); err != nil {
func Open(workerId int, jsonconf string) error {
if err := openAdapter(workerId, jsonconf); err != nil {
return err
}
if err := adp.CheckDbVersion(); err != nil {
Expand All @@ -92,37 +86,38 @@ func IsOpen() bool {
return false
}

func GetAdapterName() string {
if adp != nil {
return adp.GetName()
}

return ""
}

// InitDb creates a new database instance. If 'reset' is true it will first attempt to drop
// existing database. If jsconf is nil it will assume that the connection is already open.
// If it's non-nil, it will use the config string to open the DB connection first.
func InitDb(useAdapter, jsonconf string, reset bool) error {
func InitDb(jsonconf string, reset bool) error {
if !IsOpen() {
if err := openAdapter(1, useAdapter, jsonconf); err != nil {
if err := openAdapter(1, jsonconf); err != nil {
return err
}
}
return adp.CreateDb(reset)
}

// Registered database adapters.
var dbAdapters map[string]adapter.Adapter

// Register makes a persistence adapter available by the provided name.
// Register makes a persistence adapter available.
// If Register is called twice or if the adapter is nil, it panics.
// Name is currently unused, i.e. only a single adapter can be registered
func RegisterAdapter(name string, a adapter.Adapter) {
if dbAdapters == nil {
dbAdapters = make(map[string]adapter.Adapter)
}
if a == nil {
panic("store: Register adapter is nil")
}

if _, dup := dbAdapters[name]; dup {
panic("store: duplicate registration of adapter " + name)
if adp != nil {
panic("store: adapter '" + adp.GetName() + "' is already registered")
}

dbAdapters[name] = a
adp = a
}

// GetUid generates a unique ID suitable for use as a primary key.
Expand Down
19 changes: 15 additions & 4 deletions server/store/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,8 +611,8 @@ type Subscription struct {
public interface{}
// deserialized SeqID from user or topic
seqId int
// Id of the last delete operation deserialized from user or topic
// delId int
// Deserialized TouchedAt from topic
touchedAt *time.Time
// timestamp when the user was last online
lastSeen time.Time
// user agent string of the last online access
Expand Down Expand Up @@ -644,6 +644,16 @@ func (s *Subscription) GetWith() string {
return s.with
}

// GetLastSeen returns lastSeen.
func (s *Subscription) GetTouchedAt() *time.Time {
return s.touchedAt
}

// GetSeqId returns seqId.
func (s *Subscription) SetTouchedAt(touchedAt *time.Time) {
s.touchedAt = touchedAt
}

// GetSeqId returns seqId.
func (s *Subscription) GetSeqId() int {
return s.seqId
Expand Down Expand Up @@ -697,11 +707,12 @@ type perUserData struct {
given AccessMode
}

// Topic stored in database
// Topic stored in database. Topic's name is Id
type Topic struct {
ObjHeader

// Name string -- topic name is stored in Id
// Timestamp when the last message has passed through the topic
TouchedAt *time.Time

// Use bearer token or use ACL
UseBt bool
Expand Down
3 changes: 0 additions & 3 deletions server/tinode.conf
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@
// Generate your own and keep it secret.
"uid_key": "la6YsO+bNX/+XIkOqc5Svw==",

// Name of the backend adapter to use.
"use_adapter": "rethinkdb",

// Configurations of individual adapters.
"adapters": {
// MySQL configuration.
Expand Down
1 change: 1 addition & 0 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,7 @@ func (t *Topic) replyGetSub(sess *Session, id string, opts *MsgGetOpts) error {

if !deleted && !banned {
if isReader {
mts.TouchedAt = sub.GetTouchedAt()
mts.SeqId = sub.GetSeqId()
mts.DelId = sub.DelId
}
Expand Down
4 changes: 2 additions & 2 deletions tinode-db/gendb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"github.com/tinode/chat/server/store/types"
)

func genDb(reset bool, useAdapter, dbSource string, data *Data) {
func genDb(reset bool, dbSource string, data *Data) {
var err error

defer store.Close()

log.Println("Initializing DB...")

err = store.InitDb(useAdapter, dbSource, reset)
err = store.InitDb(dbSource, reset)
if err != nil {
if strings.Contains(err.Error(), " already exists") {
log.Println("DB already exists, NOT reinitializing")
Expand Down
Loading

0 comments on commit d1be998

Please sign in to comment.