77
88 "github.com/android-sms-gateway/server/internal/sms-gateway/modules/push/domain"
99 "github.com/capcom6/go-helpers/cache"
10+ "github.com/capcom6/go-helpers/maps"
1011
1112 "github.com/prometheus/client_golang/prometheus"
1213 "github.com/prometheus/client_golang/prometheus/promauto"
@@ -99,15 +100,15 @@ func (s *Service) Run(ctx context.Context) {
99100func (s * Service ) Enqueue (token string , event * domain.Event ) error {
100101 wrapper := eventWrapper {
101102 token : token ,
102- event : * event ,
103+ event : event ,
103104 retries : 0 ,
104105 }
105106
106107 if err := s .cache .Set (token , wrapper ); err != nil {
107108 return fmt .Errorf ("can't add message to cache: %w" , err )
108109 }
109110
110- s .enqueuedCounter .WithLabelValues (string (event .Event )).Inc ()
111+ s .enqueuedCounter .WithLabelValues (string (event .Event () )).Inc ()
111112
112113 return nil
113114}
@@ -119,36 +120,41 @@ func (s *Service) sendAll(ctx context.Context) {
119120 return
120121 }
121122
122- messages := make ( map [ string ]domain. Event , len ( targets ))
123- for _ , w := range targets {
124- if w . retries >= 3 {
125- s . retriesCounter . WithLabelValues ( "max_attempts" ). Inc ()
126- s .logger .Warn ( "Retries exceeded " , zap .String ( "token " , w . token ))
127- continue
128- }
123+ messages := maps . MapValues ( targets , func ( w eventWrapper ) domain. Event {
124+ return * w . event
125+ })
126+
127+ s .logger .Info ( "Sending messages " , zap .Int ( "count " , len ( messages ) ))
128+ ctx , cancel := context . WithTimeout ( ctx , s . config . Timeout )
129+ defer cancel ()
129130
130- messages [w .token ] = w .event
131+ errs , err := s .client .Send (ctx , messages )
132+ if len (errs ) == 0 && err == nil {
133+ s .logger .Info ("Messages sent successfully" , zap .Int ("count" , len (messages )))
134+ return
131135 }
132136
133- if len (messages ) == 0 {
137+ if err != nil {
138+ s .logger .Error ("Can't send messages" , zap .Error (err ))
134139 return
135140 }
136141
137- s .logger .Info ("Sending messages" , zap .Int ("count" , len (messages )))
138- ctx , cancel := context .WithTimeout (ctx , s .config .Timeout )
139- if errs , err := s .client .Send (ctx , messages ); err != nil {
140- for token := range errs {
141- wrapper := targets [token ]
142- wrapper .retries ++
143- if s .cache .SetOrFail (token , wrapper ) != nil {
144- s .logger .Info ("Can't set message to cache" , zap .Error (err ))
145- } else {
146- s .logger .Info ("Retrying message" , zap .String ("token" , token ))
147- }
148- s .retriesCounter .WithLabelValues ("retried" ).Inc ()
142+ for token , sendErr := range errs {
143+ s .logger .Error ("Can't send message" , zap .Error (sendErr ), zap .String ("token" , token ))
144+
145+ wrapper := targets [token ]
146+ wrapper .retries ++
147+
148+ if wrapper .retries >= maxRetries {
149+ s .retriesCounter .WithLabelValues (string (RetryOutcomeMaxAttempts )).Inc ()
150+ s .logger .Warn ("Retries exceeded" , zap .String ("token" , token ))
151+ continue
149152 }
150153
151- s .logger .Error ("Can't send messages" , zap .Error (err ))
154+ if setErr := s .cache .SetOrFail (token , wrapper ); setErr != nil {
155+ s .logger .Info ("Can't set message to cache" , zap .Error (setErr ))
156+ }
157+
158+ s .retriesCounter .WithLabelValues (string (RetryOutcomeRetried )).Inc ()
152159 }
153- cancel ()
154160}
0 commit comments