Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit a543a27

Browse files
committed
convert to background task and add tests
1 parent 09a0b67 commit a543a27

File tree

4 files changed

+214
-4
lines changed

4 files changed

+214
-4
lines changed

synapse/storage/databases/main/devices.py

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,11 @@ def __init__(self, database: DatabasePool, db_conn, hs):
955955
self._remove_duplicate_outbound_pokes,
956956
)
957957

958+
self.db_pool.updates.register_background_update_handler(
959+
"remove_deleted_devices_from_device_inbox",
960+
self._remove_deleted_devices_from_device_inbox,
961+
)
962+
958963
# a pair of background updates that were added during the 1.14 release cycle,
959964
# but replaced with 58/06dlols_unique_idx.py
960965
self.db_pool.updates.register_noop_background_update(
@@ -1045,6 +1050,63 @@ def _txn(txn):
10451050

10461051
return rows
10471052

1053+
async def _remove_deleted_devices_from_device_inbox(
1054+
self, progress: dict, batch_size: int
1055+
) -> int:
1056+
"""A background update that deletes all device_inboxes for deleted devices.
1057+
1058+
This should only need to be run once (when users upgrade to v1.45.0)
1059+
1060+
Args:
1061+
progress: dict used to store progress of this background update
1062+
batch_size: the maximum number of rows to retrieve in a single select query
1063+
1064+
Returns:
1065+
The number of deleted rows
1066+
"""
1067+
1068+
def _remove_deleted_devices_from_device_inbox_txn(
1069+
txn: LoggingTransaction,
1070+
) -> int:
1071+
1072+
sql = """
1073+
SELECT user_id, device_id, stream_id
1074+
FROM device_inbox
1075+
WHERE device_id
1076+
NOT IN (SELECT device_id FROM devices)
1077+
LIMIT ?;
1078+
"""
1079+
1080+
txn.execute(sql, (batch_size,))
1081+
rows = txn.fetchall()
1082+
1083+
row = None
1084+
for row in rows:
1085+
self.db_pool.simple_delete_txn(
1086+
txn,
1087+
"device_inbox",
1088+
{"user_id": row[0], "device_id": row[1], "stream_id": row[2]},
1089+
)
1090+
1091+
if row:
1092+
self.db_pool.updates._background_update_progress_txn(
1093+
txn, "remove_deleted_devices_from_device_inbox", row
1094+
)
1095+
1096+
return len(rows)
1097+
1098+
number_deleted = await self.db_pool.runInteraction(
1099+
"_remove_deleted_devices_from_device_inbox",
1100+
_remove_deleted_devices_from_device_inbox_txn,
1101+
)
1102+
1103+
if number_deleted < batch_size:
1104+
await self.db_pool.updates._end_background_update(
1105+
"remove_deleted_devices_from_device_inbox"
1106+
)
1107+
1108+
return number_deleted
1109+
10481110

10491111
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
10501112
def __init__(self, database: DatabasePool, db_conn, hs):
@@ -1121,7 +1183,7 @@ async def store_device(
11211183
raise StoreError(500, "Problem storing device.")
11221184

11231185
async def delete_device(self, user_id: str, device_id: str) -> None:
1124-
"""Delete a device.
1186+
"""Delete a device and this device_inbox.
11251187
11261188
Args:
11271189
user_id: The ID of the user which owns the device
@@ -1133,10 +1195,10 @@ async def delete_device(self, user_id: str, device_id: str) -> None:
11331195
desc="delete_device",
11341196
)
11351197

1136-
await self.db_pool.simple_delete_one(
1198+
await self.db_pool.simple_delete(
11371199
table="device_inbox",
11381200
keyvalues={"user_id": user_id, "device_id": device_id},
1139-
desc="delete_device",
1201+
desc="delete_device_inbox",
11401202
)
11411203

11421204
self.device_id_exists_cache.invalidate((user_id, device_id))
@@ -1161,7 +1223,7 @@ async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
11611223
column="device_id",
11621224
iterable=device_ids,
11631225
keyvalues={"user_id": user_id},
1164-
desc="delete_devices",
1226+
desc="delete_devices_inbox",
11651227
)
11661228

11671229
for device_id in device_ids:
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/* Copyright 2021 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
17+
--- Remove messages from the device_inbox table which where sent to an
18+
--- allready deleted device.
19+
--- This schould run as background task, it may take a little bit longer
20+
--- to finish.
21+
22+
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
23+
(6402, 'remove_deleted_devices_from_device_inbox', '{}');

tests/handlers/test_device.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,36 @@ def test_delete_device(self):
160160
# we'd like to check the access token was invalidated, but that's a
161161
# bit of a PITA.
162162

163+
def test_delete_device_and_device_inbox(self):
164+
self._record_users()
165+
166+
# add an device_inbox
167+
self.get_success(
168+
self.store.db_pool.simple_insert(
169+
"device_inbox",
170+
{
171+
"user_id": user1,
172+
"device_id": "abc",
173+
"stream_id": 1,
174+
"message_json": "{}",
175+
},
176+
)
177+
)
178+
179+
# delete the device
180+
self.get_success(self.handler.delete_device(user1, "abc"))
181+
182+
# check that the device_inbox was deleted
183+
self.get_failure(
184+
self.store.db_pool.simple_select_one(
185+
table="device_inbox",
186+
keyvalues={"user_id": user1, "device_id": "abc"},
187+
retcols=("user_id", "device_id"),
188+
desc="get_device_id_from_device_inbox",
189+
),
190+
synapse.api.errors.StoreError,
191+
)
192+
163193
def test_update_device(self):
164194
self._record_users()
165195

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# Copyright 2021 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from synapse.rest import admin
16+
from synapse.rest.client import devices
17+
18+
from tests.unittest import HomeserverTestCase
19+
20+
21+
class DevicesBackgroundUpdateStoreTestCase(HomeserverTestCase):
22+
23+
servlets = [
24+
admin.register_servlets,
25+
devices.register_servlets,
26+
]
27+
28+
def prepare(self, reactor, clock, hs):
29+
self.store = hs.get_datastore()
30+
self.user_id = self.register_user("foo", "pass")
31+
32+
def test_background_remove_deleted_devices_from_device_inbox(self):
33+
"""Test that the background task to delete old device_inboxes works properly."""
34+
35+
# create a valid device
36+
self.get_success(
37+
self.store.store_device(self.user_id, "cur_device", "display_name")
38+
)
39+
40+
# Add device_inbox to devices
41+
self.get_success(
42+
self.store.db_pool.simple_insert(
43+
"device_inbox",
44+
{
45+
"user_id": self.user_id,
46+
"device_id": "cur_device",
47+
"stream_id": 1,
48+
"message_json": "{}",
49+
},
50+
)
51+
)
52+
self.get_success(
53+
self.store.db_pool.simple_insert(
54+
"device_inbox",
55+
{
56+
"user_id": self.user_id,
57+
"device_id": "old_device",
58+
"stream_id": 2,
59+
"message_json": "{}",
60+
},
61+
)
62+
)
63+
64+
# Insert and run the background update.
65+
self.get_success(
66+
self.store.db_pool.simple_insert(
67+
"background_updates",
68+
{
69+
"update_name": "remove_deleted_devices_from_device_inbox",
70+
"progress_json": "{}",
71+
},
72+
)
73+
)
74+
75+
# ... and tell the DataStore that it hasn't finished all updates yet
76+
self.store.db_pool.updates._all_done = False
77+
78+
# Now let's actually drive the updates to completion
79+
while not self.get_success(
80+
self.store.db_pool.updates.has_completed_background_updates()
81+
):
82+
self.get_success(
83+
self.store.db_pool.updates.do_next_background_update(100), by=0.1
84+
)
85+
86+
# Make sure the background task deleted old device_inbox
87+
res = self.get_success(
88+
self.store.db_pool.simple_select_onecol(
89+
table="device_inbox",
90+
keyvalues={},
91+
retcol="device_id",
92+
desc="get_device_id_from_device_inbox",
93+
)
94+
)
95+
self.assertEqual(1, len(res))

0 commit comments

Comments
 (0)