Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Delete device messages asynchronously and in staged batches #16240

Merged
merged 13 commits into from
Sep 6, 2023
Prev Previous commit
Next Next commit
Address review comments
  • Loading branch information
Mathieu Velten committed Sep 5, 2023
commit 61f2c545d31870c3bd0171d018605eb1db21c74c
13 changes: 7 additions & 6 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@

logger = logging.getLogger(__name__)

DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000

Expand Down Expand Up @@ -425,7 +426,7 @@ def __init__(self, hs: "HomeServer"):
)

self._task_scheduler.register_action(
self._delete_device_messages, "delete_device_messages"
self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
)

def _check_device_name_length(self, name: Optional[str]) -> None:
Expand Down Expand Up @@ -571,7 +572,7 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:

# Delete device messages asynchronously and in batches using the task scheduler
await self._task_scheduler.schedule_task(
"delete_device_messages",
DELETE_DEVICE_MSGS_TASK_NAME,
resource_id=device_id,
params={
"user_id": user_id,
Expand Down Expand Up @@ -599,10 +600,10 @@ async def _delete_device_messages(
up_to_stream_id = task.params["up_to_stream_id"]

res = await self.store.delete_messages_for_device(
user_id,
device_id,
up_to_stream_id,
DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
Expand Down
16 changes: 13 additions & 3 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
Expand Down Expand Up @@ -268,6 +269,7 @@ def __init__(self, hs: "HomeServer"):
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._device_handler = hs.get_device_handler()
self._task_scheduler = hs.get_task_scheduler()

self.should_calculate_push_rules = hs.config.push.enable_push

Expand Down Expand Up @@ -360,11 +362,19 @@ async def _wait_for_sync_for_user(
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
deleted = await self.store.delete_messages_for_device(
sync_config.user.to_string(), sync_config.device_id, since_stream_id
# Delete device messages asynchronously and in batches using the task scheduler
await self._task_scheduler.schedule_task(
DELETE_DEVICE_MSGS_TASK_NAME,
resource_id=sync_config.device_id,
params={
"user_id": sync_config.user.to_string(),
"device_id": sync_config.device_id,
"up_to_stream_id": since_stream_id,
},
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
"Deletion of to-device messages up to %d scheduled",
since_stream_id,
)

if timeout == 0 or since_token is None or full_state:
Expand Down
17 changes: 8 additions & 9 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ async def delete_messages_for_device(
user_id: str,
device_id: Optional[str],
up_to_stream_id: int,
limit: Optional[int] = None,
limit: int,
) -> int:
"""
Args:
Expand Down Expand Up @@ -477,17 +477,16 @@ async def delete_messages_for_device(
log_kv({"message": "No changes in cache since last check"})
return 0

ROW_ID_NAME = self.database_engine.row_id_name

def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
sql = (
f"""
DELETE FROM device_inbox WHERE {self.database_engine.row_id_name} IN (
SELECT {self.database_engine.row_id_name} FROM device_inbox
sql = f"""
DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
SELECT {ROW_ID_NAME} FROM device_inbox
WHERE user_id = ? AND device_id = ? AND stream_id <= ?
LIMIT {limit}
)
"""
)
if limit:
sql += f" LIMIT {limit}"
sql += ")"

txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
Expand Down