@@ -173,37 +173,71 @@ func (s *Service) GetState(user models.User, id string) (*MessageStateOut, error
173173 return dto , nil
174174}
175175
176- func (s * Service ) Enqueue (device models.Device , message MessageIn , opts EnqueueOptions ) (MessageStateOut , error ) {
177- state := MessageStateOut {
176+ func (s * Service ) Enqueue (device models.Device , message MessageIn , opts EnqueueOptions ) (* MessageStateOut , error ) {
177+ msg , err := s .prepareMessage (device , message , opts )
178+ if err != nil {
179+ return nil , err
180+ }
181+
182+ state := & MessageStateOut {
178183 DeviceID : device .ID ,
179184 MessageStateIn : MessageStateIn {
180- State : ProcessingStatePending ,
181- Recipients : make ([]smsgateway.RecipientState , len (message .PhoneNumbers )),
185+ ID : msg .ExtID ,
186+ State : ProcessingStatePending ,
187+ Recipients : lo .Map (
188+ msg .Recipients ,
189+ func (item MessageRecipient , _ int ) smsgateway.RecipientState { return modelToRecipientState (item ) },
190+ ),
191+ States : map [string ]time.Time {},
182192 },
193+ IsHashed : false ,
194+ IsEncrypted : msg .IsEncrypted ,
183195 }
184196
197+ if insErr := s .messages .Insert (msg ); insErr != nil {
198+ return state , insErr
199+ }
200+
201+ if cacheErr := s .cache .Set (context .Background (), device .UserID , msg .ExtID , anys .AsPointer (modelToMessageState (* msg ))); cacheErr != nil {
202+ s .logger .Warn ("failed to cache message" , zap .String ("id" , msg .ExtID ), zap .Error (cacheErr ))
203+ }
204+ s .metrics .IncTotal (string (msg .State ))
205+
206+ go func (userID , deviceID string ) {
207+ if ntfErr := s .eventsSvc .Notify (userID , & deviceID , events .NewMessageEnqueuedEvent ()); ntfErr != nil {
208+ s .logger .Error (
209+ "failed to notify device" ,
210+ zap .Error (ntfErr ),
211+ zap .String ("user_id" , userID ),
212+ zap .String ("device_id" , deviceID ),
213+ )
214+ }
215+ }(device .UserID , device .ID )
216+
217+ return state , nil
218+ }
219+
220+ func (s * Service ) prepareMessage (device models.Device , message MessageIn , opts EnqueueOptions ) (* Message , error ) {
185221 var phone string
186222 var err error
187223 for i , v := range message .PhoneNumbers {
188224 if message .IsEncrypted || opts .SkipPhoneValidation {
189225 phone = v
190226 } else {
191227 if phone , err = cleanPhoneNumber (v ); err != nil {
192- return state , fmt .Errorf ("failed to use phone in row %d: %w" , i + 1 , err )
228+ return nil , fmt .Errorf ("failed to use phone in row %d: %w" , i + 1 , err )
193229 }
194230 }
195231
196232 message .PhoneNumbers [i ] = phone
197-
198- state .Recipients [i ] = smsgateway.RecipientState {
199- PhoneNumber : phone ,
200- State : smsgateway .ProcessingStatePending ,
201- }
202233 }
203234
204235 validUntil := message .ValidUntil
205236 if message .TTL != nil && * message .TTL > 0 {
206- validUntil = anys .AsPointer (time .Now ().Add (time .Duration (* message .TTL ) * time .Second ))
237+ //nolint:gosec // not a problem
238+ validUntil = anys .AsPointer (
239+ time .Now ().Add (time .Duration (* message .TTL ) * time .Second ),
240+ )
207241 }
208242
209243 msg := Message {
@@ -221,43 +255,22 @@ func (s *Service) Enqueue(device models.Device, message MessageIn, opts EnqueueO
221255 }
222256
223257 if message .TextContent != nil {
224- if err := msg .SetTextContent (* message .TextContent ); err != nil {
225- return state , fmt .Errorf ("failed to set text content: %w" , err )
258+ if setErr := msg .SetTextContent (* message .TextContent ); setErr != nil {
259+ return nil , fmt .Errorf ("failed to set text content: %w" , setErr )
226260 }
227261 } else if message .DataContent != nil {
228- if err := msg .SetDataContent (* message .DataContent ); err != nil {
229- return state , fmt .Errorf ("failed to set data content: %w" , err )
262+ if setErr := msg .SetDataContent (* message .DataContent ); setErr != nil {
263+ return nil , fmt .Errorf ("failed to set data content: %w" , setErr )
230264 }
231265 } else {
232- return state , ErrNoContent
266+ return nil , ErrNoContent
233267 }
234268
235269 if msg .ExtID == "" {
236270 msg .ExtID = s .idgen ()
237271 }
238- state .ID = msg .ExtID
239-
240- if err := s .messages .Insert (& msg ); err != nil {
241- return state , err
242- }
243-
244- if err := s .cache .Set (context .Background (), device .UserID , msg .ExtID , anys .AsPointer (modelToMessageState (msg ))); err != nil {
245- s .logger .Warn ("failed to cache message" , zap .String ("id" , msg .ExtID ), zap .Error (err ))
246- }
247- s .metrics .IncTotal (string (msg .State ))
248272
249- go func (userID , deviceID string ) {
250- if err := s .eventsSvc .Notify (userID , & deviceID , events .NewMessageEnqueuedEvent ()); err != nil {
251- s .logger .Error (
252- "failed to notify device" ,
253- zap .Error (err ),
254- zap .String ("user_id" , userID ),
255- zap .String ("device_id" , deviceID ),
256- )
257- }
258- }(device .UserID , device .ID )
259-
260- return state , nil
273+ return & msg , nil
261274}
262275
263276func (s * Service ) ExportInbox (device models.Device , since , until time.Time ) error {
0 commit comments