77
88 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
99 "github.com/capcom6/go-helpers/cache"
10+ "github.com/capcom6/go-helpers/maps"
1011
1112 "github.com/prometheus/client_golang/prometheus"
1213 "github.com/prometheus/client_golang/prometheus/promauto"
@@ -38,9 +39,12 @@ type Service struct {
3839
3940 client client
4041
41- cache * cache.Cache [domain.Event ]
42+ cache * cache.Cache [eventWrapper ]
43+ blacklist * cache.Cache [struct {}]
4244
43- enqueuedCounter * prometheus.CounterVec
45+ enqueuedCounter * prometheus.CounterVec
46+ retriesCounter * prometheus.CounterVec
47+ blacklistCounter * prometheus.CounterVec
4448
4549 logger * zap.Logger
4650}
@@ -60,12 +64,34 @@ func New(params Params) *Service {
6064 Help : "Total number of messages enqueued" ,
6165 }, []string {"event" })
6266
67+ retriesCounter := promauto .NewCounterVec (prometheus.CounterOpts {
68+ Namespace : "sms" ,
69+ Subsystem : "push" ,
70+ Name : "retries_total" ,
71+ Help : "Total retry attempts" ,
72+ }, []string {"outcome" })
73+
74+ blacklistCounter := promauto .NewCounterVec (prometheus.CounterOpts {
75+ Namespace : "sms" ,
76+ Subsystem : "push" ,
77+ Name : "blacklist_total" ,
78+ Help : "Blacklist operations" ,
79+ }, []string {"operation" })
80+
6381 return & Service {
64- config : params .Config ,
65- client : params .Client ,
66- cache : cache.New [domain.Event ](cache.Config {}),
67- enqueuedCounter : enqueuedCounter ,
68- logger : params .Logger ,
82+ config : params .Config ,
83+ client : params .Client ,
84+
85+ cache : cache.New [eventWrapper ](cache.Config {}),
86+ blacklist : cache.New [struct {}](cache.Config {
87+ TTL : blacklistTimeout ,
88+ }),
89+
90+ enqueuedCounter : enqueuedCounter ,
91+ retriesCounter : retriesCounter ,
92+ blacklistCounter : blacklistCounter ,
93+
94+ logger : params .Logger ,
6995 }
7096}
7197
@@ -86,11 +112,23 @@ func (s *Service) Run(ctx context.Context) {
86112
87113// Enqueue adds the data to the cache and immediately sends all messages if the debounce is 0.
88114func (s * Service ) Enqueue (token string , event * domain.Event ) error {
89- if err := s .cache .Set (token , * event ); err != nil {
115+ if _ , err := s .blacklist .Get (token ); err == nil {
116+ s .blacklistCounter .WithLabelValues (string (BlacklistOperationSkipped )).Inc ()
117+ s .logger .Debug ("Skipping blacklisted token" , zap .String ("token" , token ))
118+ return nil
119+ }
120+
121+ wrapper := eventWrapper {
122+ token : token ,
123+ event : event ,
124+ retries : 0 ,
125+ }
126+
127+ if err := s .cache .Set (token , wrapper ); err != nil {
90128 return fmt .Errorf ("can't add message to cache: %w" , err )
91129 }
92130
93- s .enqueuedCounter .WithLabelValues (string (event .Event )).Inc ()
131+ s .enqueuedCounter .WithLabelValues (string (event .Event () )).Inc ()
94132
95133 return nil
96134}
@@ -102,10 +140,48 @@ func (s *Service) sendAll(ctx context.Context) {
102140 return
103141 }
104142
105- s .logger .Info ("Sending messages" , zap .Int ("count" , len (targets )))
143+ messages := maps .MapValues (targets , func (w eventWrapper ) domain.Event {
144+ return * w .event
145+ })
146+
147+ s .logger .Info ("Sending messages" , zap .Int ("count" , len (messages )))
106148 ctx , cancel := context .WithTimeout (ctx , s .config .Timeout )
107- if err := s .client .Send (ctx , targets ); err != nil {
149+ defer cancel ()
150+
151+ errs , err := s .client .Send (ctx , messages )
152+ if len (errs ) == 0 && err == nil {
153+ s .logger .Info ("Messages sent successfully" , zap .Int ("count" , len (messages )))
154+ return
155+ }
156+
157+ if err != nil {
108158 s .logger .Error ("Can't send messages" , zap .Error (err ))
159+ return
160+ }
161+
162+ for token , sendErr := range errs {
163+ s .logger .Error ("Can't send message" , zap .Error (sendErr ), zap .String ("token" , token ))
164+
165+ wrapper := targets [token ]
166+ wrapper .retries ++
167+
168+ if wrapper .retries >= maxRetries {
169+ if err := s .blacklist .Set (token , struct {}{}); err != nil {
170+ s .logger .Warn ("Can't add to blacklist" , zap .String ("token" , token ), zap .Error (err ))
171+ }
172+
173+ s .blacklistCounter .WithLabelValues (string (BlacklistOperationAdded )).Inc ()
174+ s .retriesCounter .WithLabelValues (string (RetryOutcomeMaxAttempts )).Inc ()
175+ s .logger .Warn ("Retries exceeded, blacklisting token" ,
176+ zap .String ("token" , token ),
177+ zap .Duration ("ttl" , blacklistTimeout ))
178+ continue
179+ }
180+
181+ if setErr := s .cache .SetOrFail (token , wrapper ); setErr != nil {
182+ s .logger .Info ("Can't set message to cache" , zap .Error (setErr ))
183+ }
184+
185+ s .retriesCounter .WithLabelValues (string (RetryOutcomeRetried )).Inc ()
109186 }
110- cancel ()
111187}
0 commit comments