26
26
cast ,
27
27
)
28
28
29
+ from synapse .api .constants import EventContentFields
29
30
from synapse .logging import issue9533_logger
30
- from synapse .logging .opentracing import log_kv , set_tag , trace
31
+ from synapse .logging .opentracing import (
32
+ SynapseTags ,
33
+ log_kv ,
34
+ set_tag ,
35
+ start_active_span ,
36
+ trace ,
37
+ )
31
38
from synapse .replication .tcp .streams import ToDeviceStream
32
39
from synapse .storage ._base import SQLBaseStore , db_to_json
33
40
from synapse .storage .database import (
@@ -397,6 +404,17 @@ def get_device_messages_txn(
397
404
(recipient_user_id , recipient_device_id ), []
398
405
).append (message_dict )
399
406
407
+ # start a new span for each message, so that we can tag each separately
408
+ with start_active_span ("get_to_device_message" ):
409
+ set_tag (SynapseTags .TO_DEVICE_TYPE , message_dict ["type" ])
410
+ set_tag (SynapseTags .TO_DEVICE_SENDER , message_dict ["sender" ])
411
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , recipient_user_id )
412
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , recipient_device_id )
413
+ set_tag (
414
+ SynapseTags .TO_DEVICE_MSGID ,
415
+ message_dict ["content" ].get (EventContentFields .TO_DEVICE_MSGID ),
416
+ )
417
+
400
418
if limit is not None and rowcount == limit :
401
419
# We ended up bumping up against the message limit. There may be more messages
402
420
# to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ def add_messages_txn(
678
696
],
679
697
)
680
698
681
- if remote_messages_by_destination :
682
- issue9533_logger .debug (
683
- "Queued outgoing to-device messages with stream_id %i for %s" ,
684
- stream_id ,
685
- list (remote_messages_by_destination .keys ()),
686
- )
699
+ for destination , edu in remote_messages_by_destination .items ():
700
+ if issue9533_logger .isEnabledFor (logging .DEBUG ):
701
+ issue9533_logger .debug (
702
+ "Queued outgoing to-device messages with "
703
+ "stream_id %i, EDU message_id %s, type %s for %s: %s" ,
704
+ stream_id ,
705
+ edu ["message_id" ],
706
+ edu ["type" ],
707
+ destination ,
708
+ [
709
+ f"{ user_id } /{ device_id } (msgid "
710
+ f"{ msg .get (EventContentFields .TO_DEVICE_MSGID )} )"
711
+ for (user_id , messages_by_device ) in edu ["messages" ].items ()
712
+ for (device_id , msg ) in messages_by_device .items ()
713
+ ],
714
+ )
715
+
716
+ for (user_id , messages_by_device ) in edu ["messages" ].items ():
717
+ for (device_id , msg ) in messages_by_device .items ():
718
+ with start_active_span ("store_outgoing_to_device_message" ):
719
+ set_tag (SynapseTags .TO_DEVICE_EDU_ID , edu ["sender" ])
720
+ set_tag (SynapseTags .TO_DEVICE_EDU_ID , edu ["message_id" ])
721
+ set_tag (SynapseTags .TO_DEVICE_TYPE , edu ["type" ])
722
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , user_id )
723
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , device_id )
724
+ set_tag (
725
+ SynapseTags .TO_DEVICE_MSGID ,
726
+ msg .get (EventContentFields .TO_DEVICE_MSGID ),
727
+ )
687
728
688
729
async with self ._device_inbox_id_gen .get_next () as stream_id :
689
730
now_ms = self ._clock .time_msec ()
@@ -801,7 +842,19 @@ def _add_messages_to_local_device_inbox_txn(
801
842
# Only insert into the local inbox if the device exists on
802
843
# this server
803
844
device_id = row ["device_id" ]
804
- message_json = json_encoder .encode (messages_by_device [device_id ])
845
+
846
+ with start_active_span ("serialise_to_device_message" ):
847
+ msg = messages_by_device [device_id ]
848
+ set_tag (SynapseTags .TO_DEVICE_TYPE , msg ["type" ])
849
+ set_tag (SynapseTags .TO_DEVICE_SENDER , msg ["sender" ])
850
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT , user_id )
851
+ set_tag (SynapseTags .TO_DEVICE_RECIPIENT_DEVICE , device_id )
852
+ set_tag (
853
+ SynapseTags .TO_DEVICE_MSGID ,
854
+ msg ["content" ].get (EventContentFields .TO_DEVICE_MSGID ),
855
+ )
856
+ message_json = json_encoder .encode (msg )
857
+
805
858
messages_json_for_user [device_id ] = message_json
806
859
807
860
if messages_json_for_user :
@@ -821,15 +874,20 @@ def _add_messages_to_local_device_inbox_txn(
821
874
],
822
875
)
823
876
824
- issue9533_logger .debug (
825
- "Stored to-device messages with stream_id %i for %s" ,
826
- stream_id ,
827
- [
828
- (user_id , device_id )
829
- for (user_id , messages_by_device ) in local_by_user_then_device .items ()
830
- for device_id in messages_by_device .keys ()
831
- ],
832
- )
877
+ if issue9533_logger .isEnabledFor (logging .DEBUG ):
878
+ issue9533_logger .debug (
879
+ "Stored to-device messages with stream_id %i: %s" ,
880
+ stream_id ,
881
+ [
882
+ f"{ user_id } /{ device_id } (msgid "
883
+ f"{ msg ['content' ].get (EventContentFields .TO_DEVICE_MSGID )} )"
884
+ for (
885
+ user_id ,
886
+ messages_by_device ,
887
+ ) in messages_by_user_then_device .items ()
888
+ for (device_id , msg ) in messages_by_device .items ()
889
+ ],
890
+ )
833
891
834
892
835
893
class DeviceInboxBackgroundUpdateStore (SQLBaseStore ):
0 commit comments