Skip to content

Commit

Permalink
more work on topic push, updated fcm errors
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed Aug 11, 2020
1 parent cc49106 commit 30f93cc
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 29 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/prometheus/common v0.9.1
github.com/tinode/jsonco v1.0.0
github.com/tinode/snowflake v1.0.0
go.etcd.io/bbolt v1.3.5 // indirect
go.mongodb.org/mongo-driver v1.3.1
golang.org/x/crypto v0.0.0-20200320181102-891825fb96df
golang.org/x/net v0.0.0-20200320220750-118fecf932d8
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo=
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.etcd.io/bbolt v1.3.5 h1:XAzx9gjCb0Rxj7EoqcClPD1d5ZBxZJk0jbuoPHenBt0=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.mongodb.org/mongo-driver v1.3.1 h1:op56IfTQiaY2679w922KVWa3qcHdml2K/Io8ayAOUEQ=
go.mongodb.org/mongo-driver v1.3.1/go.mod h1:MSWZXKOynuguX+JSvwP8i+58jYCXxbia8HS3gZBapIE=
go.opencensus.io v0.21.0 h1:mU6zScU4U1YAFPHEHYk+3JC4SY7JxgkqS10ZOSyksNg=
Expand Down Expand Up @@ -264,6 +266,7 @@ golang.org/x/sys v0.0.0-20190531175056-4c3a928424d2/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82 h1:ywK/j/KkyTHcdyYSZNXGjMwgmDSfjglYZ3vStQ/gSCU=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
Expand Down
91 changes: 69 additions & 22 deletions server/push/fcm/push_fcm.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ const (
// Size of the input channel buffer.
bufferSize = 1024

// Maximum length of a text message in runes
// The number of push messages sent in one batch. FCM constant.
pushBatchSize = 100

// The number of sub/unsub requests sent in one batch. FCM constant.
subBatchSize = 1000

// Maximum length of a text message in runes. The message is clipped if length is exceeded.
// TODO: implement intelligent clipping of Drafty messages.
maxMessageLength = 80
)

Expand Down Expand Up @@ -107,17 +114,31 @@ func (Handler) Init(jsonconf string) error {

func sendNotifications(rcpt *push.Receipt, config *configType) {
messages := PrepareNotifications(rcpt, &config.Android)
if len(messages) == 0 {
n := len(messages)
if n == 0 {
return
}

ctx := context.Background()
for _, m := range messages {
_, err := handler.client.Send(ctx, m.Message)
for i := 0; i < n; i += pushBatchSize {
upper := i + pushBatchSize
if upper > n {
upper = n
}
var batch []*fcm.Message
for j := i; j < upper; j++ {
batch = append(batch, messages[j].Message)
}
resp, err := handler.client.SendAll(ctx, batch)
if err != nil {
if handlePushError(err, m.Uid, m.DeviceId) {
return
}
// Complete failure.
log.Println("fcm SendAll failed", err)
break
}

// Check for partial failure.
if handlePushErrors(resp, messages[i:upper]) {
break
}
}
}
Expand All @@ -127,26 +148,53 @@ func processSubscription(req *push.ChannelReq) {
return
}

ctx := context.Background()
if len(req.Devices) > subBatchSize {
req.Devices = req.Devices[0:subBatchSize]
}

var err error
var resp *fcm.TopicManagementResponse
if req.Unsub {
resp, err = handler.client.UnsubscribeFromTopic(ctx, req.Devices, req.Channel)
resp, err = handler.client.UnsubscribeFromTopic(context.Background(), req.Devices, req.Channel)
} else {
resp, err = handler.client.SubscribeToTopic(ctx, req.Devices, req.Channel)
resp, err = handler.client.SubscribeToTopic(context.Background(), req.Devices, req.Channel)
}

if err != nil {
handlePushError(err)
// Complete failure.
log.Println("fcm sub or upsub failed", req.Unsub, err)
} else {

// Check for partial failure.
handleSubErrors(resp, req.Uid, req.Devices)
}
}

// handlePushError processes error returned by a call to FCM.
// handlePushError processes errors returned by a call to fcm.SendAll.
// returns true to stop further processing of other messages.
func handlePushError(err error, uid types.Uid, deviceId string) bool {
func handlePushErrors(response *fcm.BatchResponse, batch []MessageData) bool {
if response.FailureCount <= 0 {
return false
}

for i, resp := range response.Responses {
if handleFcmError(resp.Error, batch[i].Uid, batch[i].DeviceId) {
return true
}
}
return false
}

func handleSubErrors(response *fcm.TopicManagementResponse, uid types.Uid, devices []string) {
if response.FailureCount <= 0 {
return
}

for _, errinfo := range response.Errors {
// FCM documentation sucks. There is no list of possible errors so no action can be taken but logging.
log.Println("fcm sub/unsub error", errinfo.Reason, uid, devices[errinfo.Index])
}
}

func handleFcmError(err error, uid types.Uid, deviceId string) bool {
if fcm.IsMessageRateExceeded(err) ||
fcm.IsServerUnavailable(err) ||
fcm.IsInternal(err) ||
Expand All @@ -155,22 +203,21 @@ func handlePushError(err error, uid types.Uid, deviceId string) bool {
log.Println("fcm transient failure", err)
return true
}

if fcm.IsMismatchedCredential(err) || fcm.IsInvalidArgument(err) {
// Config errors
log.Println("fcm push: failed", err)
log.Println("fcm: request failed", err)
return true
}

if fcm.IsRegistrationTokenNotRegistered(err) {
// Token is no longer valid.
log.Println("fcm push: invalid token", err)
err = store.Devices.Delete(uid, deviceId)
if err != nil {
log.Println("fcm push: failed to delete invalid token", err)
log.Println("fcm: invalid token", uid, err)
if err := store.Devices.Delete(uid, deviceId); err != nil {
log.Println("fcm: failed to delete invalid token", err)
}
} else {
log.Println("fcm push:", err)
// All other errors are treated as non-fatal.
log.Println("fcm error:", err)
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion server/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Receipt struct {
// ChannelReq is a request to subscribe/unsubscribe device IDs to channel (FCM topic).
type ChannelReq struct {
// Uid is the id of the user making request
Uid types.Uid
Uid t.Uid
// Channel to subscribe to or unsubscribe from.
Channel string `json:"channel"`
// Devices to subscribe or unsubscribe.
Expand Down
14 changes: 8 additions & 6 deletions server/push/tnpg/push_tnpg.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,15 @@ const (
invalidArgument = "invalid-argument"
messageRateExceeded = "message-rate-exceeded"
mismatchedCredential = "mismatched-credential"
quotaExceeded = "quota-exceeded"
registrationTokenNotRegistered = "registration-token-not-registered"
senderIDMismatch = "sender-id-mismatch"
serverUnavailable = "server-unavailable"
thirdPartyAuthError = "third-party-auth-error"
tooManyTopics = "too-many-topics"
unavailableError = "unavailable-error"
unknownError = "unknown-error"
unregisteredError = "unregistered-error"
)

// Init initializes the handler
Expand Down Expand Up @@ -161,9 +166,6 @@ func postMessage(body interface{}, config *configType) (*batchResponse, error) {

func sendPushes(rcpt *push.Receipt, config *configType) {
messages := fcm.PrepareNotifications(rcpt, nil)
if len(messages) == 0 {
return
}

n := len(messages)
for i := 0; i < n; i += batchSize {
Expand Down Expand Up @@ -216,15 +218,15 @@ func handleResponse(batch *batchResponse, messages []fcm.MessageData) {
for i, resp := range batch.Responses {
switch resp.ErrorCode {
case "": // no error
case messageRateExceeded, serverUnavailable, internalError, unknownError:
case messageRateExceeded, quotaExceeded, serverUnavailable, unavailableError, internalError, unknownError:
// Transient errors. Stop sending this batch.
log.Println("tnpg transient failure", resp.ErrorMessage)
return
case mismatchedCredential, invalidArgument:
case mismatchedCredential, invalidArgument, senderIDMismatch, thirdPartyAuthError, invalidAPNSCredentials:
// Config errors
log.Println("tnpg invalid config", resp.ErrorMessage)
return
case registrationTokenNotRegistered:
case registrationTokenNotRegistered, unregisteredError:
// Token is no longer valid.
log.Println("tnpg invalid token", resp.ErrorMessage)
if err := store.Devices.Delete(messages[i].Uid, messages[i].DeviceId); err != nil {
Expand Down

0 comments on commit 30f93cc

Please sign in to comment.