Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Debug for device lists updates (#11760)
Browse files Browse the repository at this point in the history
Debug for #8631.

I'm having a hard time tracking down what's going wrong in that issue.
In the reported example, I could see server A sending federation traffic
to server B and all was well. Yet B reports out-of-sync device updates
from A.

I couldn't see what was _in_ the events being sent from A to B. So I
have added some crude logging to track

- when we have updates to send to a remote HS
- the edus we actually accumulate to send
- when a federation transaction includes a device list update edu
- when such an EDU is received

This is a bit of a sledgehammer.
  • Loading branch information
David Robertson authored Jan 20, 2022
1 parent fa583c2 commit f160fe1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/11760.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add optional debugging to investigate [issue 8631](https://github.com/matrix-org/synapse/issues/8631).
12 changes: 12 additions & 0 deletions synapse/federation/sender/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import synapse.server

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

last_pdu_ts_metric = Gauge(
"synapse_federation_last_sent_pdu_time",
Expand Down Expand Up @@ -124,6 +125,17 @@ async def send_new_transaction(
len(pdus),
len(edus),
)
if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content for edu in edus if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"about to send txn [%s] including device list updates: %s",
transaction.transaction_id,
device_list_updates,
)

# Actually send the transaction

Expand Down
15 changes: 15 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")


class BaseFederationServerServlet(BaseFederationServlet):
Expand Down Expand Up @@ -95,6 +96,20 @@ async def on_PUT(
len(transaction_data.get("edus", [])),
)

if issue_8631_logger.isEnabledFor(logging.DEBUG):
DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
device_list_updates = [
edu.content
for edu in transaction_data.get("edus", [])
if edu.edu_type in DEVICE_UPDATE_EDUS
]
if device_list_updates:
issue_8631_logger.debug(
"received transaction [%s] including device list updates: %s",
transaction_id,
device_list_updates,
)

except Exception as e:
logger.exception(e)
return 400, {"error": "Invalid transaction"}
Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from synapse.server import HomeServer

logger = logging.getLogger(__name__)
issue_8631_logger = logging.getLogger("synapse.8631_debug")

DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
"drop_device_list_streams_non_unique_indexes"
Expand Down Expand Up @@ -229,6 +230,12 @@ async def get_device_updates_by_remote(
if not updates:
return now_stream_id, []

if issue_8631_logger.isEnabledFor(logging.DEBUG):
data = {(user, device): stream_id for user, device, stream_id, _ in updates}
issue_8631_logger.debug(
"device updates need to be sent to %s: %s", destination, data
)

# get the cross-signing keys of the users in the list, so that we can
# determine which of the device changes were cross-signing keys
users = {r[0] for r in updates}
Expand Down Expand Up @@ -365,6 +372,17 @@ async def get_device_updates_by_remote(
# and remove the length budgeting above.
results.append(("org.matrix.signing_key_update", result))

if issue_8631_logger.isEnabledFor(logging.DEBUG):
for (user_id, edu) in results:
issue_8631_logger.debug(
"device update to %s for %s from %s to %s: %s",
destination,
user_id,
from_stream_id,
last_processed_stream_id,
edu,
)

return last_processed_stream_id, results

def _get_device_updates_by_remote_txn(
Expand Down

0 comments on commit f160fe1

Please sign in to comment.