Skip to content

Commit

Permalink
Merge branch 'master' into mongodb_adp
Browse files Browse the repository at this point in the history
  • Loading branch information
Googlom committed Nov 18, 2019
2 parents 0b7a423 + d4bc44f commit 125f048
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 12 deletions.
3 changes: 3 additions & 0 deletions INSTALL.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ See [instructions](./docker/README.md)

5. Test your installation by pointing your browser to [http://localhost:6060/](http://localhost:6060/). The static files from the `-static_data` path are served at web root `/`. You can change this by editing the line `static_mount` in the config file.

If you are running Tinode alongside another webserver, such as Apache or nginx, keep in mind that you need to launch the webapp from the URL served by Tinode. Otherwise it won't work.


## Running a Cluster

- Install and run the database, run DB initializer, unpack JS files as described in the previous section. Both MySQL and RethinkDB supports [cluster](https://www.mysql.com/products/cluster/) [mode](https://www.rethinkdb.com/docs/start-a-server/#a-rethinkdb-cluster-using-multiple-machines). You may consider it for added resiliency.
Expand Down
3 changes: 1 addition & 2 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,7 @@ func (c *Cluster) routeToTopic(msg *ClientComMessage, topic string, sess *Sessio
}

if sess.getRemoteSub(topic) == nil {
log.Printf("Remote subscription missing for topic '%s', sid '%s'", topic, sess.sid)
sess.addRemoteSub(topic, &RemoteSubscription{node: n.name})
log.Printf("No remote subscription (yet) for topic '%s', sid '%s'", topic, sess.sid)
}

req := &ClusterReq{
Expand Down
3 changes: 3 additions & 0 deletions server/db/mysql/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1864,12 +1864,15 @@ func (a *adapter) FindTopics(req, opt []string) ([]t.Subscription, error) {

// Messages
func (a *adapter) MessageSave(msg *t.Message) error {
// store assignes message ID, but we don't use it. Message IDs are not used anywhere.
// Using a sequential ID provided by the database.
res, err := a.db.Exec(
"INSERT INTO messages(createdAt,updatedAt,seqid,topic,`from`,head,content) VALUES(?,?,?,?,?,?,?)",
msg.CreatedAt, msg.UpdatedAt, msg.SeqId, msg.Topic,
store.DecodeUid(t.ParseUid(msg.From)), msg.Head, toJSON(msg.Content))
if err == nil {
id, _ := res.LastInsertId()
// Replacing ID given by store by ID given by the DB.
msg.SetUid(t.Uid(id))
}
return err
Expand Down
2 changes: 0 additions & 2 deletions server/db/rethinkdb/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,6 @@ func (a *adapter) FindTopics(req, opt []string) ([]t.Subscription, error) {

// Messages
func (a *adapter) MessageSave(msg *t.Message) error {
msg.SetUid(store.GetUid())
_, err := rdb.DB(a.dbName).Table("messages").Insert(msg).RunWrite(a.conn)
return err
}
Expand Down Expand Up @@ -1653,7 +1652,6 @@ func (a *adapter) MessageDeleteList(topic string, toDel *t.DelMessage) error {
err = a.messagesHardDelete(topic)
} else {
// Only some messages are being deleted
toDel.SetUid(store.GetUid())

// Start with making a log entry
_, err = rdb.DB(a.dbName).Table("dellog").Insert(toDel).RunWrite(a.conn)
Expand Down
52 changes: 52 additions & 0 deletions server/http_pprof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Debug tooling. Dumps named profile in response to HTTP request at
// http(s)://<host-name>/<configured-path>/<profile-name>
// See godoc for the list of possible profile names: https://golang.org/pkg/runtime/pprof/#Profile

package main

import (
"fmt"
"log"
"net/http"
"path"
"runtime/pprof"
"strings"
)

var pprofHttpRoot string

// Expose debug profiling at the given URL path.
func servePprof(mux *http.ServeMux, serveAt string) {
if serveAt == "" || serveAt == "-" {
return
}

pprofHttpRoot = path.Clean("/"+serveAt) + "/"
mux.HandleFunc(pprofHttpRoot, profileHandler)

log.Printf("pprof: profiling info exposed at '%s'", pprofHttpRoot)
}

func profileHandler(wrt http.ResponseWriter, req *http.Request) {
wrt.Header().Set("X-Content-Type-Options", "nosniff")
wrt.Header().Set("Content-Type", "text/plain; charset=utf-8")

profileName := strings.TrimPrefix(req.URL.Path, pprofHttpRoot)

profile := pprof.Lookup(profileName)
if profile == nil {
servePprofError(wrt, http.StatusNotFound, "Unknown profile '"+profileName+"'")
return
}

// Respond with the requested profile.
profile.WriteTo(wrt, 2)
}

func servePprofError(wrt http.ResponseWriter, status int, txt string) {
wrt.Header().Set("Content-Type", "text/plain; charset=utf-8")
wrt.Header().Set("X-Go-Pprof", "1")
wrt.Header().Del("Content-Disposition")
wrt.WriteHeader(status)
fmt.Fprintln(wrt, txt)
}
5 changes: 4 additions & 1 deletion server/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ func (h *Hub) run() {
// Check if 'sys' topic has migrated to this node.
if h.topicGet("sys") == nil && !globals.cluster.isRemoteTopic("sys") {
// Yes, 'sys' has migrated here. Initialize it.
h.join <- &sessionJoin{topic: "sys", internal: true, pkt: &ClientComMessage{topic: "sys"}}
// The h.join is unbuffered. We must call from another goroutine. Otherwise deadlock.
go func() {
h.join <- &sessionJoin{topic: "sys", internal: true, pkt: &ClientComMessage{topic: "sys"}}
}()
}

case hubdone := <-h.shutdown:
Expand Down
9 changes: 7 additions & 2 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,14 @@ func main() {

var configfile = flag.String("config", "tinode.conf", "Path to config file.")
// Path to static content.
var staticPath = flag.String("static_data", defaultStaticPath, "Path to directory with static files to be served.")
var staticPath = flag.String("static_data", defaultStaticPath, "File path to directory with static files to be served.")
var listenOn = flag.String("listen", "", "Override address and port to listen on for HTTP(S) clients.")
var listenGrpc = flag.String("grpc_listen", "", "Override address and port to listen on for gRPC clients.")
var tlsEnabled = flag.Bool("tls_enabled", false, "Override config value for enabling TLS.")
var clusterSelf = flag.String("cluster_self", "", "Override the name of the current cluster node.")
var expvarPath = flag.String("expvar", "", "Override the path where runtime stats are exposed.")
var expvarPath = flag.String("expvar", "", "Override the URL path where runtime stats are exposed. Use '-' to disable.")
var pprofFile = flag.String("pprof", "", "File name to save profiling info to. Disabled if not set.")
var pprofUrl = flag.String("pprof_url", "", "Debugging only! URL path for exposing profiling info. Disabled if not set.")
flag.Parse()

*configfile = toAbsolutePath(rootpath, *configfile)
Expand All @@ -265,6 +266,7 @@ func main() {
// Set up HTTP server. Must use non-default mux because of expvar.
mux := http.NewServeMux()

// Exposing values for statistics and monitoring.
evpath := *expvarPath
if evpath == "" {
evpath = config.ExpvarPath
Expand All @@ -273,6 +275,9 @@ func main() {
statsRegisterInt("Version")
statsSet("Version", int64(parseVersion(currentVersion)))

// Initialize serving debug profiles (optional).
servePprof(mux, *pprofUrl)

// Initialize cluster and receive calculated workerId.
// Cluster won't be started here yet.
workerId := clusterInit(config.Cluster, clusterSelf)
Expand Down
18 changes: 15 additions & 3 deletions server/push/fcm/push_fcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,26 @@ func sendNotifications(rcpt *push.Receipt, config *configType) {
} else if d.Platform == "ios" {
// iOS uses Badge to show the total unread message count.
badge := rcpt.To[uid].Unread
// Need to duplicate these in APNS.Payload.Aps.Alert so
// iOS may call NotificationServiceExtension (if present).
title := "New message"
body := data["content"]
msg.APNS = &fcm.APNSConfig{
Payload: &fcm.APNSPayload{
Aps: &fcm.Aps{Badge: &badge},
Aps: &fcm.Aps{
Badge: &badge,
MutableContent: true,
Sound: "default",
Alert: &fcm.ApsAlert{
Title: title,
Body: body,
},
},
},
}
msg.Notification = &fcm.Notification{
Title: "New message",
Body: data["content"],
Title: title,
Body: body,
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ var Messages MessagesObjMapper
// Save message
func (MessagesObjMapper) Save(msg *types.Message, readBySender bool) error {
msg.InitTimes()

msg.SetUid(GetUid())
// Increment topic's or user's SeqId
err := adp.TopicUpdateOnMessage(msg.Topic, msg)
if err != nil {
Expand Down Expand Up @@ -571,6 +571,7 @@ func (MessagesObjMapper) DeleteList(topic string, delID int, forUser types.Uid,
DelId: delID,
DeletedFor: forUser.String(),
SeqIdRanges: ranges}
toDel.SetUid(GetUid())
toDel.InitTimes()
}

Expand Down
2 changes: 1 addition & 1 deletion server/tinode.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
// Maximum number of indexable tags per topic or user.
"max_tag_count": 16,

// URL path for exposing runtime stats. Disabled if the path is blank.
// URL path for exposing runtime stats. Disabled if the path is blank or "-".
// Could be overriden from the command line with --expvar.
"expvar": "/debug/vars",

Expand Down

0 comments on commit 125f048

Please sign in to comment.