Skip to content

Commit faf5974

Browse files
committed
[push] add send retries
1 parent e457a16 commit faf5974

File tree

5 files changed

+79
-20
lines changed

5 files changed

+79
-20
lines changed

internal/sms-gateway/modules/push/fcm/client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"maps"
8+
"slices"
79
"sync"
810

911
firebase "firebase.google.com/go/v4"
@@ -53,8 +55,8 @@ func (c *Client) Open(ctx context.Context) error {
5355
return nil
5456
}
5557

56-
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
57-
errs := make([]error, 0, len(messages))
58+
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
59+
errs := make(map[string]error, len(messages))
5860
for address, payload := range messages {
5961
_, err := c.client.Send(ctx, &messaging.Message{
6062
Data: payload.Map(),
@@ -65,11 +67,11 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
6567
})
6668

6769
if err != nil {
68-
errs = append(errs, fmt.Errorf("can't send message to %s: %w", address, err))
70+
errs[address] = fmt.Errorf("can't send message to %s: %w", address, err)
6971
}
7072
}
7173

72-
return errors.Join(errs...)
74+
return errs, errors.Join(slices.Collect(maps.Values(errs))...)
7375
}
7476

7577
func (c *Client) Close(ctx context.Context) error {

internal/sms-gateway/modules/push/service.go

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ type Service struct {
3838

3939
client client
4040

41-
cache *cache.Cache[domain.Event]
41+
cache *cache.Cache[eventWrapper]
4242

4343
enqueuedCounter *prometheus.CounterVec
44+
retriesCounter *prometheus.CounterVec
4445

4546
logger *zap.Logger
4647
}
@@ -60,12 +61,22 @@ func New(params Params) *Service {
6061
Help: "Total number of messages enqueued",
6162
}, []string{"event"})
6263

64+
retriesCounter := promauto.NewCounterVec(prometheus.CounterOpts{
65+
Namespace: "sms",
66+
Subsystem: "push",
67+
Name: "retries_total",
68+
Help: "Total retry attempts",
69+
}, []string{"outcome"})
70+
6371
return &Service{
64-
config: params.Config,
65-
client: params.Client,
66-
cache: cache.New[domain.Event](cache.Config{}),
72+
config: params.Config,
73+
client: params.Client,
74+
cache: cache.New[eventWrapper](cache.Config{}),
75+
6776
enqueuedCounter: enqueuedCounter,
68-
logger: params.Logger,
77+
retriesCounter: retriesCounter,
78+
79+
logger: params.Logger,
6980
}
7081
}
7182

@@ -86,7 +97,13 @@ func (s *Service) Run(ctx context.Context) {
8697

8798
// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
8899
func (s *Service) Enqueue(token string, event *domain.Event) error {
89-
if err := s.cache.Set(token, *event); err != nil {
100+
wrapper := eventWrapper{
101+
token: token,
102+
event: *event,
103+
retries: 0,
104+
}
105+
106+
if err := s.cache.Set(token, wrapper); err != nil {
90107
return fmt.Errorf("can't add message to cache: %w", err)
91108
}
92109

@@ -102,9 +119,35 @@ func (s *Service) sendAll(ctx context.Context) {
102119
return
103120
}
104121

105-
s.logger.Info("Sending messages", zap.Int("count", len(targets)))
122+
messages := make(map[string]domain.Event, len(targets))
123+
for _, w := range targets {
124+
if w.retries >= 3 {
125+
s.retriesCounter.WithLabelValues("max_attempts").Inc()
126+
s.logger.Warn("Retries exceeded", zap.String("token", w.token))
127+
continue
128+
}
129+
130+
messages[w.token] = w.event
131+
}
132+
133+
if len(messages) == 0 {
134+
return
135+
}
136+
137+
s.logger.Info("Sending messages", zap.Int("count", len(messages)))
106138
ctx, cancel := context.WithTimeout(ctx, s.config.Timeout)
107-
if err := s.client.Send(ctx, targets); err != nil {
139+
if errs, err := s.client.Send(ctx, messages); err != nil {
140+
for token := range errs {
141+
wrapper := targets[token]
142+
wrapper.retries++
143+
if s.cache.SetOrFail(token, wrapper) != nil {
144+
s.logger.Info("Can't set message to cache", zap.Error(err))
145+
} else {
146+
s.logger.Info("Retrying message", zap.String("token", token))
147+
}
148+
s.retriesCounter.WithLabelValues("retried").Inc()
149+
}
150+
108151
s.logger.Error("Can't send messages", zap.Error(err))
109152
}
110153
cancel()

internal/sms-gateway/modules/push/types.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,16 @@ const (
1818

1919
type client interface {
2020
Open(ctx context.Context) error
21-
Send(ctx context.Context, messages map[string]domain.Event) error
21+
Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error)
2222
Close(ctx context.Context) error
2323
}
2424

25+
type eventWrapper struct {
26+
token string
27+
event domain.Event
28+
retries int
29+
}
30+
2531
func NewMessageEnqueuedEvent() *domain.Event {
2632
return domain.NewEvent(smsgateway.PushMessageEnqueued, nil)
2733
}

internal/sms-gateway/modules/push/upstream/client.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/android-sms-gateway/client-go/smsgateway"
1313
"github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
14+
"github.com/capcom6/go-helpers/maps"
1415
)
1516

1617
const BASE_URL = "https://api.sms-gate.app/upstream/v1"
@@ -41,7 +42,7 @@ func (c *Client) Open(ctx context.Context) error {
4142
return nil
4243
}
4344

44-
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) error {
45+
func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) (map[string]error, error) {
4546
payload := make(smsgateway.UpstreamPushRequest, 0, len(messages))
4647

4748
for address, data := range messages {
@@ -53,21 +54,22 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
5354
}
5455

5556
payloadBytes, err := json.Marshal(payload)
57+
5658
if err != nil {
57-
return fmt.Errorf("can't marshal payload: %w", err)
59+
return c.sendError(messages, fmt.Errorf("can't marshal payload: %w", err))
5860
}
5961

6062
req, err := http.NewRequestWithContext(ctx, http.MethodPost, BASE_URL+"/push", bytes.NewReader(payloadBytes))
6163
if err != nil {
62-
return fmt.Errorf("can't create request: %w", err)
64+
return c.sendError(messages, fmt.Errorf("can't create request: %w", err))
6365
}
6466

6567
req.Header.Set("Content-Type", "application/json")
6668
req.Header.Set("User-Agent", "android-sms-gateway/1.x (server; golang)")
6769

6870
resp, err := c.client.Do(req)
6971
if err != nil {
70-
return fmt.Errorf("can't send request: %w", err)
72+
return c.sendError(messages, fmt.Errorf("can't send request: %w", err))
7173
}
7274

7375
defer func() {
@@ -76,10 +78,16 @@ func (c *Client) Send(ctx context.Context, messages map[string]domain.Event) err
7678
}()
7779

7880
if resp.StatusCode >= 400 {
79-
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
81+
return c.sendError(messages, fmt.Errorf("unexpected status code: %d", resp.StatusCode))
8082
}
8183

82-
return nil
84+
return nil, nil
85+
}
86+
87+
func (c *Client) sendError(messages map[string]domain.Event, err error) (map[string]error, error) {
88+
return maps.MapValues(messages, func(e domain.Event) error {
89+
return err
90+
}), err
8391
}
8492

8593
func (c *Client) Close(ctx context.Context) error {

pkg/swagger/docs/requests.http

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ Authorization: Basic {{credentials}}
2121
"{{phone}}"
2222
],
2323
"withDeliveryReport": true,
24-
"priority": 128,
24+
"priority": 127,
2525
"simNumber": {{$randomInt 1 2}}
2626
}
2727

0 commit comments

Comments
 (0)