Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions internal/sms-gateway/handlers/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (h *upstreamHandler) postPush(c *fiber.Ctx) error {
return err
}

event := push.Event{
Event: anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
Data: v.Data,
}
event := push.NewEvent(
anys.ZeroDefault(v.Event, smsgateway.PushMessageEnqueued),
v.Data,
)

if err := h.pushSvc.Enqueue(v.Token, &event); err != nil {
if err := h.pushSvc.Enqueue(v.Token, event); err != nil {
h.Logger.Error("Can't push message", zap.Error(err))
}
}
Expand Down
22 changes: 22 additions & 0 deletions internal/sms-gateway/modules/push/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package push

import "time"

const (
maxRetries = 3
blacklistTimeout = 15 * time.Minute
)

type RetryOutcome string

const (
RetryOutcomeRetried RetryOutcome = "retried"
RetryOutcomeMaxAttempts RetryOutcome = "max_attempts"
)

type BlacklistOperation string

const (
BlacklistOperationAdded BlacklistOperation = "added"
BlacklistOperationSkipped BlacklistOperation = "skipped"
)
20 changes: 14 additions & 6 deletions internal/sms-gateway/modules/push/domain/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,30 @@ import (
)

type Event struct {
Event smsgateway.PushEventType
Data map[string]string
event smsgateway.PushEventType
data map[string]string
}

func (e *Event) Event() smsgateway.PushEventType {
return e.event
}

func (e *Event) Data() map[string]string {
return e.data
}

func (e *Event) Map() map[string]string {
json, _ := json.Marshal(e.Data)
json, _ := json.Marshal(e.data)

return map[string]string{
"event": string(e.Event),
"event": string(e.event),
"data": string(json),
}
}

func NewEvent(event smsgateway.PushEventType, data map[string]string) *Event {
return &Event{
Event: event,
Data: data,
event: event,
data: data,
}
}
9 changes: 4 additions & 5 deletions internal/sms-gateway/modules/push/fcm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package fcm

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -53,8 +52,8 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}

func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
errs := make([]error, 0, len(messages))
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
errs := make(map[string]error, len(messages))
for address, payload := range messages {
_, err := c.client.Send(ctx, &messaging.Message{
Data: payload.Map(),
Expand All @@ -65,11 +64,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
})

if err != nil {
errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err))
errs[address] = fmt.Errorf("can't send message to %s: %w", address, err)
}
}

return errors.Join(errs...)
return errs, nil
}

func (c *Client) Close(ctx context.Context) error {
Expand Down
100 changes: 88 additions & 12 deletions internal/sms-gateway/modules/push/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/capcom6/go-helpers/cache"
"github.com/capcom6/go-helpers/maps"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -38,9 +39,12 @@ type Service struct {

client client

cache *cache.Cache[domain.Event]
cache *cache.Cache[eventWrapper]
blacklist *cache.Cache[struct{}]

enqueuedCounter *prometheus.CounterVec
enqueuedCounter *prometheus.CounterVec
retriesCounter *prometheus.CounterVec
blacklistCounter *prometheus.CounterVec

logger *zap.Logger
}
Expand All @@ -60,12 +64,34 @@ func New(params Params) *Service {
Help: "Total number of messages enqueued",
}, []string{"event"})

retriesCounter := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "push",
Name: "retries_total",
Help: "Total retry attempts",
}, []string{"outcome"})

blacklistCounter := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "sms",
Subsystem: "push",
Name: "blacklist_total",
Help: "Blacklist operations",
}, []string{"operation"})

return &Service{
config: params.Config,
client: params.Client,
cache: cache.New[domain.Event](cache.Config{}),
enqueuedCounter: enqueuedCounter,
logger: params.Logger,
config: params.Config,
client: params.Client,

cache: cache.New[eventWrapper](cache.Config{}),
blacklist: cache.New[struct{}](cache.Config{
TTL: blacklistTimeout,
}),

enqueuedCounter: enqueuedCounter,
retriesCounter: retriesCounter,
blacklistCounter: blacklistCounter,

logger: params.Logger,
}
}

Expand All @@ -86,11 +112,23 @@ func (s *Service) Run(ctx context.Context) {

// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
func (s *Service) Enqueue(token string, event *domain.Event) error {
if err := s.cache.Set(token, *event); err != nil {
if _, err := s.blacklist.Get(token); err == nil {
s.blacklistCounter.WithLabelValues(string(BlacklistOperationSkipped)).Inc()
s.logger.Debug("Skipping blacklisted token", zap.String("token", token))
return nil
}

wrapper := eventWrapper{
token: token,
event: event,
retries: 0,
}

if err := s.cache.Set(token, wrapper); err != nil {
return fmt.Errorf("can't add message to cache: %w", err)
}

s.enqueuedCounter.WithLabelValues(string(event.Event)).Inc()
s.enqueuedCounter.WithLabelValues(string(event.Event())).Inc()

return nil
}
Expand All @@ -102,10 +140,48 @@ func (s *Service) sendAll(ctx context.Context) {
return
}

s.logger.Info("Sending messages", zap.Int("count", len(targets)))
messages := maps.MapValues(targets, func(w eventWrapper) domain.Event {
return *w.event
})

s.logger.Info("Sending messages", zap.Int("count", len(messages)))
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
if err := s.client.Send(ctx, targets); err != nil {
defer cancel()

errs, err := s.client.Send(ctx, messages)
if len(errs) == 0 && err == nil {
s.logger.Info("Messages sent successfully", zap.Int("count", len(messages)))
return
}

if err != nil {
s.logger.Error("Can't send messages", zap.Error(err))
return
}

for token, sendErr := range errs {
s.logger.Error("Can't send message", zap.Error(sendErr), zap.String("token", token))

wrapper := targets[token]
wrapper.retries++

if wrapper.retries >= maxRetries {
if err := s.blacklist.Set(token, struct{}{}); err != nil {
s.logger.Warn("Can't add to blacklist", zap.String("token", token), zap.Error(err))
}

s.blacklistCounter.WithLabelValues(string(BlacklistOperationAdded)).Inc()
s.retriesCounter.WithLabelValues(string(RetryOutcomeMaxAttempts)).Inc()
s.logger.Warn("Retries exceeded, blacklisting token",
zap.String("token", token),
zap.Duration("ttl", blacklistTimeout))
continue
}

if setErr := s.cache.SetOrFail(token, wrapper); setErr != nil {
s.logger.Info("Can't set message to cache", zap.Error(setErr))
}

s.retriesCounter.WithLabelValues(string(RetryOutcomeRetried)).Inc()
}
cancel()
}
10 changes: 9 additions & 1 deletion internal/sms-gateway/modules/push/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,25 @@ import (
type Mode string
type Event = domain.Event

var NewEvent = domain.NewEvent

const (
ModeFCM Mode = "fcm"
ModeUpstream Mode = "upstream"
)

type client interface {
Open(ctx context.Context) error
Send(ctx context.Context, messages map[string]domain.Event) error
Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error)
Close(ctx context.Context) error
}

type eventWrapper struct {
token string
event *domain.Event
retries int
}

func NewMessageEnqueuedEvent() *domain.Event {
return domain.NewEvent(smsgateway.PushMessageEnqueued, nil)
}
Expand Down
24 changes: 16 additions & 8 deletions internal/sms-gateway/modules/push/upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/android-sms-gateway/client-go/smsgateway"
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
"github.com/capcom6/go-helpers/maps"
)

const BASE_URL = "https://api.sms-gate.app/upstream/v1"
Expand Down Expand Up @@ -41,33 +42,34 @@ func (c *Client) Open(ctx context.Context) error {
return nil
}

func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))

for address, data := range messages {
payload = append(payload, smsgateway.PushNotification{
Token: address,
Event: data.Event,
Data: data.Data,
Event: data.Event(),
Data: data.Data(),
})
}

payloadBytes, err := json.Marshal(payload)

if err != nil {
return fmt.Errorf("can't marshal payload: %w", err)
return nil, fmt.Errorf("can't marshal payload: %w", err)
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes))
if err != nil {
return fmt.Errorf("can't create request: %w", err)
return nil, fmt.Errorf("can't create request: %w", err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "android-sms-gateway/1.x (server; golang)")

resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("can't send request: %w", err)
return c.mapErrors(messages, fmt.Errorf("can't send request: %w", err)), nil
}

defer func() {
Expand All @@ -76,10 +78,16 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
}()

if resp.StatusCode >= 400 {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
return c.mapErrors(messages, fmt.Errorf("unexpected status code: %d", resp.StatusCode)), nil
}

return nil
return nil, nil
}

func (c *Client) mapErrors(messages map[string]domain.Event, err error) map[string]error {
return maps.MapValues(messages, func(e domain.Event) error {
return err
})
}

func (c *Client) Close(ctx context.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/swagger/docs/requests.http
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Authorization: Basic {{credentials}}
"{{phone}}"
],
"withDeliveryReport": true,
"priority": 128,
"priority": 127,
"simNumber": {{$randomInt 1 2}}
}

Expand Down
Loading