diff --git a/go.mod b/go.mod index 5693a7b7f..ba6ff1625 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/prometheus/common v0.9.1 github.com/tinode/jsonco v1.0.0 github.com/tinode/snowflake v1.0.0 + go.etcd.io/bbolt v1.3.5 // indirect go.mongodb.org/mongo-driver v1.3.1 golang.org/x/crypto v0.0.0-20200320181102-891825fb96df golang.org/x/net v0.0.0-20200320220750-118fecf932d8 diff --git a/go.sum b/go.sum index 9c7f0e503..8b5368cd1 100644 --- a/go.sum +++ b/go.sum @@ -207,6 +207,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0= +go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.mongodb.org/mongo-driver v1.3.1 h1:op56IfTQiaY2679w922KVWa3qcHdml2K/Io8ayAOUEQ= go.mongodb.org/mongo-driver v1.3.1/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE= go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg= @@ -264,6 +266,7 @@ golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= diff --git a/server/push/fcm/push_fcm.go b/server/push/fcm/push_fcm.go index 55c8e672d..8dfd32af2 100644 --- a/server/push/fcm/push_fcm.go +++ b/server/push/fcm/push_fcm.go @@ -27,7 +27,14 @@ const ( // Size of the input channel buffer. bufferSize = 1024 - // Maximum length of a text message in runes + // The number of push messages sent in one batch. FCM constant. + pushBatchSize = 100 + + // The number of sub/unsub requests sent in one batch. FCM constant. + subBatchSize = 1000 + + // Maximum length of a text message in runes. The message is clipped if length is exceeded. + // TODO: implement intelligent clipping of Drafty messages. maxMessageLength = 80 ) @@ -107,17 +114,31 @@ func (Handler) Init(jsonconf string) error { func sendNotifications(rcpt *push.Receipt, config *configType) { messages := PrepareNotifications(rcpt, &config.Android) - if len(messages) == 0 { + n := len(messages) + if n == 0 { return } ctx := context.Background() - for _, m := range messages { - _, err := handler.client.Send(ctx, m.Message) + for i := 0; i < n; i += pushBatchSize { + upper := i + pushBatchSize + if upper > n { + upper = n + } + var batch []*fcm.Message + for j := i; j < upper; j++ { + batch = append(batch, messages[j].Message) + } + resp, err := handler.client.SendAll(ctx, batch) if err != nil { - if handlePushError(err, m.Uid, m.DeviceId) { - return - } + // Complete failure. + log.Println("fcm SendAll failed", err) + break + } + + // Check for partial failure. + if handlePushErrors(resp, messages[i:upper]) { + break } } } @@ -127,26 +148,53 @@ func processSubscription(req *push.ChannelReq) { return } - ctx := context.Background() + if len(req.Devices) > subBatchSize { + req.Devices = req.Devices[0:subBatchSize] + } var err error var resp *fcm.TopicManagementResponse if req.Unsub { - resp, err = handler.client.UnsubscribeFromTopic(ctx, req.Devices, req.Channel) + resp, err = handler.client.UnsubscribeFromTopic(context.Background(), req.Devices, req.Channel) } else { - resp, err = handler.client.SubscribeToTopic(ctx, req.Devices, req.Channel) + resp, err = handler.client.SubscribeToTopic(context.Background(), req.Devices, req.Channel) } - if err != nil { - handlePushError(err) + // Complete failure. + log.Println("fcm sub or upsub failed", req.Unsub, err) } else { - + // Check for partial failure. + handleSubErrors(resp, req.Uid, req.Devices) } } -// handlePushError processes error returned by a call to FCM. +// handlePushError processes errors returned by a call to fcm.SendAll. // returns true to stop further processing of other messages. -func handlePushError(err error, uid types.Uid, deviceId string) bool { +func handlePushErrors(response *fcm.BatchResponse, batch []MessageData) bool { + if response.FailureCount <= 0 { + return false + } + + for i, resp := range response.Responses { + if handleFcmError(resp.Error, batch[i].Uid, batch[i].DeviceId) { + return true + } + } + return false +} + +func handleSubErrors(response *fcm.TopicManagementResponse, uid types.Uid, devices []string) { + if response.FailureCount <= 0 { + return + } + + for _, errinfo := range response.Errors { + // FCM documentation sucks. There is no list of possible errors so no action can be taken but logging. + log.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index]) + } +} + +func handleFcmError(err error, uid types.Uid, deviceId string) bool { if fcm.IsMessageRateExceeded(err) || fcm.IsServerUnavailable(err) || fcm.IsInternal(err) || @@ -155,22 +203,21 @@ func handlePushError(err error, uid types.Uid, deviceId string) bool { log.Println("fcm transient failure", err) return true } - if fcm.IsMismatchedCredential(err) || fcm.IsInvalidArgument(err) { // Config errors - log.Println("fcm push: failed", err) + log.Println("fcm: request failed", err) return true } if fcm.IsRegistrationTokenNotRegistered(err) { // Token is no longer valid. - log.Println("fcm push: invalid token", err) - err = store.Devices.Delete(uid, deviceId) - if err != nil { - log.Println("fcm push: failed to delete invalid token", err) + log.Println("fcm: invalid token", uid, err) + if err := store.Devices.Delete(uid, deviceId); err != nil { + log.Println("fcm: failed to delete invalid token", err) } } else { - log.Println("fcm push:", err) + // All other errors are treated as non-fatal. + log.Println("fcm error:", err) } return false } diff --git a/server/push/push.go b/server/push/push.go index 1550c91dd..603ef4d07 100644 --- a/server/push/push.go +++ b/server/push/push.go @@ -40,7 +40,7 @@ type Receipt struct { // ChannelReq is a request to subscribe/unsubscribe device IDs to channel (FCM topic). type ChannelReq struct { // Uid is the id of the user making request - Uid types.Uid + Uid t.Uid // Channel to subscribe to or unsubscribe from. Channel string `json:"channel"` // Devices to subscribe or unsubscribe. diff --git a/server/push/tnpg/push_tnpg.go b/server/push/tnpg/push_tnpg.go index ab388ea28..fefa9d8ab 100644 --- a/server/push/tnpg/push_tnpg.go +++ b/server/push/tnpg/push_tnpg.go @@ -67,10 +67,15 @@ const ( invalidArgument = "invalid-argument" messageRateExceeded = "message-rate-exceeded" mismatchedCredential = "mismatched-credential" + quotaExceeded = "quota-exceeded" registrationTokenNotRegistered = "registration-token-not-registered" + senderIDMismatch = "sender-id-mismatch" serverUnavailable = "server-unavailable" + thirdPartyAuthError = "third-party-auth-error" tooManyTopics = "too-many-topics" + unavailableError = "unavailable-error" unknownError = "unknown-error" + unregisteredError = "unregistered-error" ) // Init initializes the handler @@ -161,9 +166,6 @@ func postMessage(body interface{}, config *configType) (*batchResponse, error) { func sendPushes(rcpt *push.Receipt, config *configType) { messages := fcm.PrepareNotifications(rcpt, nil) - if len(messages) == 0 { - return - } n := len(messages) for i := 0; i < n; i += batchSize { @@ -216,15 +218,15 @@ func handleResponse(batch *batchResponse, messages []fcm.MessageData) { for i, resp := range batch.Responses { switch resp.ErrorCode { case "": // no error - case messageRateExceeded, serverUnavailable, internalError, unknownError: + case messageRateExceeded, quotaExceeded, serverUnavailable, unavailableError, internalError, unknownError: // Transient errors. Stop sending this batch. log.Println("tnpg transient failure", resp.ErrorMessage) return - case mismatchedCredential, invalidArgument: + case mismatchedCredential, invalidArgument, senderIDMismatch, thirdPartyAuthError, invalidAPNSCredentials: // Config errors log.Println("tnpg invalid config", resp.ErrorMessage) return - case registrationTokenNotRegistered: + case registrationTokenNotRegistered, unregisteredError: // Token is no longer valid. log.Println("tnpg invalid token", resp.ErrorMessage) if err := store.Devices.Delete(messages[i].Uid, messages[i].DeviceId); err != nil {