Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit fe0f4a3

Browse files
authored
Move additional tasks to the background worker, part 3 (#8489)
1 parent c9c0ad5 commit fe0f4a3

File tree

8 files changed

+276
-282
lines changed

8 files changed

+276
-282
lines changed

changelog.d/8489.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Allow running background tasks in a separate worker process.

synapse/app/phone_stats_home.py

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,7 @@
1818

1919
from prometheus_client import Gauge
2020

21-
from synapse.metrics.background_process_metrics import (
22-
run_as_background_process,
23-
wrap_as_background_process,
24-
)
21+
from synapse.metrics.background_process_metrics import wrap_as_background_process
2522

2623
logger = logging.getLogger("synapse.app.homeserver")
2724

@@ -152,13 +149,8 @@ def performance_stats_init():
152149
clock.looping_call(hs.get_datastore().generate_user_daily_visits, 5 * 60 * 1000)
153150

154151
# monthly active user limiting functionality
155-
def reap_monthly_active_users():
156-
return run_as_background_process(
157-
"reap_monthly_active_users", hs.get_datastore().reap_monthly_active_users
158-
)
159-
160-
clock.looping_call(reap_monthly_active_users, 1000 * 60 * 60)
161-
reap_monthly_active_users()
152+
clock.looping_call(hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60)
153+
hs.get_datastore().reap_monthly_active_users()
162154

163155
@wrap_as_background_process("generate_monthly_active_users")
164156
async def generate_monthly_active_users():

synapse/storage/databases/main/censor_events.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from typing import TYPE_CHECKING
1818

1919
from synapse.events.utils import prune_event_dict
20-
from synapse.metrics.background_process_metrics import run_as_background_process
20+
from synapse.metrics.background_process_metrics import wrap_as_background_process
2121
from synapse.storage._base import SQLBaseStore
2222
from synapse.storage.database import DatabasePool
2323
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
@@ -35,14 +35,13 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
3535
def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
3636
super().__init__(database, db_conn, hs)
3737

38-
def _censor_redactions():
39-
return run_as_background_process(
40-
"_censor_redactions", self._censor_redactions
41-
)
42-
43-
if self.hs.config.redaction_retention_period is not None:
44-
hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000)
38+
if (
39+
hs.config.run_background_tasks
40+
and self.hs.config.redaction_retention_period is not None
41+
):
42+
hs.get_clock().looping_call(self._censor_redactions, 5 * 60 * 1000)
4543

44+
@wrap_as_background_process("_censor_redactions")
4645
async def _censor_redactions(self):
4746
"""Censors all redactions older than the configured period that haven't
4847
been censored yet.

synapse/storage/databases/main/devices.py

Lines changed: 101 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
trace,
2626
whitelisted_homeserver,
2727
)
28-
from synapse.metrics.background_process_metrics import run_as_background_process
28+
from synapse.metrics.background_process_metrics import wrap_as_background_process
2929
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
3030
from synapse.storage.database import (
3131
DatabasePool,
@@ -48,6 +48,14 @@
4848

4949

5050
class DeviceWorkerStore(SQLBaseStore):
51+
def __init__(self, database: DatabasePool, db_conn, hs):
52+
super().__init__(database, db_conn, hs)
53+
54+
if hs.config.run_background_tasks:
55+
self._clock.looping_call(
56+
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
57+
)
58+
5159
async def get_device(self, user_id: str, device_id: str) -> Dict[str, Any]:
5260
"""Retrieve a device. Only returns devices that are not marked as
5361
hidden.
@@ -772,6 +780,98 @@ async def remove_dehydrated_device(self, user_id: str, device_id: str) -> bool:
772780
)
773781
return count >= 1
774782

783+
@wrap_as_background_process("prune_old_outbound_device_pokes")
784+
async def _prune_old_outbound_device_pokes(
785+
self, prune_age: int = 24 * 60 * 60 * 1000
786+
) -> None:
787+
"""Delete old entries out of the device_lists_outbound_pokes to ensure
788+
that we don't fill up due to dead servers.
789+
790+
Normally, we try to send device updates as a delta since a previous known point:
791+
this is done by setting the prev_id in the m.device_list_update EDU. However,
792+
for that to work, we have to have a complete record of each change to
793+
each device, which can add up to quite a lot of data.
794+
795+
An alternative mechanism is that, if the remote server sees that it has missed
796+
an entry in the stream_id sequence for a given user, it will request a full
797+
list of that user's devices. Hence, we can reduce the amount of data we have to
798+
store (and transmit in some future transaction), by clearing almost everything
799+
for a given destination out of the database, and having the remote server
800+
resync.
801+
802+
All we need to do is make sure we keep at least one row for each
803+
(user, destination) pair, to remind us to send a m.device_list_update EDU for
804+
that user when the destination comes back. It doesn't matter which device
805+
we keep.
806+
"""
807+
yesterday = self._clock.time_msec() - prune_age
808+
809+
def _prune_txn(txn):
810+
# look for (user, destination) pairs which have an update older than
811+
# the cutoff.
812+
#
813+
# For each pair, we also need to know the most recent stream_id, and
814+
# an arbitrary device_id at that stream_id.
815+
select_sql = """
816+
SELECT
817+
dlop1.destination,
818+
dlop1.user_id,
819+
MAX(dlop1.stream_id) AS stream_id,
820+
(SELECT MIN(dlop2.device_id) AS device_id FROM
821+
device_lists_outbound_pokes dlop2
822+
WHERE dlop2.destination = dlop1.destination AND
823+
dlop2.user_id=dlop1.user_id AND
824+
dlop2.stream_id=MAX(dlop1.stream_id)
825+
)
826+
FROM device_lists_outbound_pokes dlop1
827+
GROUP BY destination, user_id
828+
HAVING min(ts) < ? AND count(*) > 1
829+
"""
830+
831+
txn.execute(select_sql, (yesterday,))
832+
rows = txn.fetchall()
833+
834+
if not rows:
835+
return
836+
837+
logger.info(
838+
"Pruning old outbound device list updates for %i users/destinations: %s",
839+
len(rows),
840+
shortstr((row[0], row[1]) for row in rows),
841+
)
842+
843+
# we want to keep the update with the highest stream_id for each user.
844+
#
845+
# there might be more than one update (with different device_ids) with the
846+
# same stream_id, so we also delete all but one rows with the max stream id.
847+
delete_sql = """
848+
DELETE FROM device_lists_outbound_pokes
849+
WHERE destination = ? AND user_id = ? AND (
850+
stream_id < ? OR
851+
(stream_id = ? AND device_id != ?)
852+
)
853+
"""
854+
count = 0
855+
for (destination, user_id, stream_id, device_id) in rows:
856+
txn.execute(
857+
delete_sql, (destination, user_id, stream_id, stream_id, device_id)
858+
)
859+
count += txn.rowcount
860+
861+
# Since we've deleted unsent deltas, we need to remove the entry
862+
# of last successful sent so that the prev_ids are correctly set.
863+
sql = """
864+
DELETE FROM device_lists_outbound_last_success
865+
WHERE destination = ? AND user_id = ?
866+
"""
867+
txn.executemany(sql, ((row[0], row[1]) for row in rows))
868+
869+
logger.info("Pruned %d device list outbound pokes", count)
870+
871+
await self.db_pool.runInteraction(
872+
"_prune_old_outbound_device_pokes", _prune_txn,
873+
)
874+
775875

776876
class DeviceBackgroundUpdateStore(SQLBaseStore):
777877
def __init__(self, database: DatabasePool, db_conn, hs):
@@ -908,8 +1008,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
9081008
name="device_id_exists", keylen=2, max_entries=10000
9091009
)
9101010

911-
self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
912-
9131011
async def store_device(
9141012
self, user_id: str, device_id: str, initial_device_display_name: Optional[str]
9151013
) -> bool:
@@ -1267,95 +1365,3 @@ def _add_device_outbound_poke_to_stream_txn(
12671365
for device_id in device_ids
12681366
],
12691367
)
1270-
1271-
def _prune_old_outbound_device_pokes(self, prune_age: int = 24 * 60 * 60 * 1000):
1272-
"""Delete old entries out of the device_lists_outbound_pokes to ensure
1273-
that we don't fill up due to dead servers.
1274-
1275-
Normally, we try to send device updates as a delta since a previous known point:
1276-
this is done by setting the prev_id in the m.device_list_update EDU. However,
1277-
for that to work, we have to have a complete record of each change to
1278-
each device, which can add up to quite a lot of data.
1279-
1280-
An alternative mechanism is that, if the remote server sees that it has missed
1281-
an entry in the stream_id sequence for a given user, it will request a full
1282-
list of that user's devices. Hence, we can reduce the amount of data we have to
1283-
store (and transmit in some future transaction), by clearing almost everything
1284-
for a given destination out of the database, and having the remote server
1285-
resync.
1286-
1287-
All we need to do is make sure we keep at least one row for each
1288-
(user, destination) pair, to remind us to send a m.device_list_update EDU for
1289-
that user when the destination comes back. It doesn't matter which device
1290-
we keep.
1291-
"""
1292-
yesterday = self._clock.time_msec() - prune_age
1293-
1294-
def _prune_txn(txn):
1295-
# look for (user, destination) pairs which have an update older than
1296-
# the cutoff.
1297-
#
1298-
# For each pair, we also need to know the most recent stream_id, and
1299-
# an arbitrary device_id at that stream_id.
1300-
select_sql = """
1301-
SELECT
1302-
dlop1.destination,
1303-
dlop1.user_id,
1304-
MAX(dlop1.stream_id) AS stream_id,
1305-
(SELECT MIN(dlop2.device_id) AS device_id FROM
1306-
device_lists_outbound_pokes dlop2
1307-
WHERE dlop2.destination = dlop1.destination AND
1308-
dlop2.user_id=dlop1.user_id AND
1309-
dlop2.stream_id=MAX(dlop1.stream_id)
1310-
)
1311-
FROM device_lists_outbound_pokes dlop1
1312-
GROUP BY destination, user_id
1313-
HAVING min(ts) < ? AND count(*) > 1
1314-
"""
1315-
1316-
txn.execute(select_sql, (yesterday,))
1317-
rows = txn.fetchall()
1318-
1319-
if not rows:
1320-
return
1321-
1322-
logger.info(
1323-
"Pruning old outbound device list updates for %i users/destinations: %s",
1324-
len(rows),
1325-
shortstr((row[0], row[1]) for row in rows),
1326-
)
1327-
1328-
# we want to keep the update with the highest stream_id for each user.
1329-
#
1330-
# there might be more than one update (with different device_ids) with the
1331-
# same stream_id, so we also delete all but one rows with the max stream id.
1332-
delete_sql = """
1333-
DELETE FROM device_lists_outbound_pokes
1334-
WHERE destination = ? AND user_id = ? AND (
1335-
stream_id < ? OR
1336-
(stream_id = ? AND device_id != ?)
1337-
)
1338-
"""
1339-
count = 0
1340-
for (destination, user_id, stream_id, device_id) in rows:
1341-
txn.execute(
1342-
delete_sql, (destination, user_id, stream_id, stream_id, device_id)
1343-
)
1344-
count += txn.rowcount
1345-
1346-
# Since we've deleted unsent deltas, we need to remove the entry
1347-
# of last successful sent so that the prev_ids are correctly set.
1348-
sql = """
1349-
DELETE FROM device_lists_outbound_last_success
1350-
WHERE destination = ? AND user_id = ?
1351-
"""
1352-
txn.executemany(sql, ((row[0], row[1]) for row in rows))
1353-
1354-
logger.info("Pruned %d device list outbound pokes", count)
1355-
1356-
return run_as_background_process(
1357-
"prune_old_outbound_device_pokes",
1358-
self.db_pool.runInteraction,
1359-
"_prune_old_outbound_device_pokes",
1360-
_prune_txn,
1361-
)

synapse/storage/databases/main/event_federation.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
from synapse.api.errors import StoreError
2121
from synapse.events import EventBase
22-
from synapse.metrics.background_process_metrics import run_as_background_process
22+
from synapse.metrics.background_process_metrics import wrap_as_background_process
2323
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
2424
from synapse.storage.database import DatabasePool, LoggingTransaction
2525
from synapse.storage.databases.main.events_worker import EventsWorkerStore
@@ -32,6 +32,14 @@
3232

3333

3434
class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBaseStore):
35+
def __init__(self, database: DatabasePool, db_conn, hs):
36+
super().__init__(database, db_conn, hs)
37+
38+
if hs.config.run_background_tasks:
39+
hs.get_clock().looping_call(
40+
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
41+
)
42+
3543
async def get_auth_chain(
3644
self, event_ids: Collection[str], include_given: bool = False
3745
) -> List[EventBase]:
@@ -586,6 +594,28 @@ async def get_successor_events(self, event_ids: Iterable[str]) -> List[str]:
586594

587595
return [row["event_id"] for row in rows]
588596

597+
@wrap_as_background_process("delete_old_forward_extrem_cache")
598+
async def _delete_old_forward_extrem_cache(self) -> None:
599+
def _delete_old_forward_extrem_cache_txn(txn):
600+
# Delete entries older than a month, while making sure we don't delete
601+
# the only entries for a room.
602+
sql = """
603+
DELETE FROM stream_ordering_to_exterm
604+
WHERE
605+
room_id IN (
606+
SELECT room_id
607+
FROM stream_ordering_to_exterm
608+
WHERE stream_ordering > ?
609+
) AND stream_ordering < ?
610+
"""
611+
txn.execute(
612+
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
613+
)
614+
615+
await self.db_pool.runInteraction(
616+
"_delete_old_forward_extrem_cache", _delete_old_forward_extrem_cache_txn,
617+
)
618+
589619

590620
class EventFederationStore(EventFederationWorkerStore):
591621
""" Responsible for storing and serving up the various graphs associated
@@ -606,34 +636,6 @@ def __init__(self, database: DatabasePool, db_conn, hs):
606636
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
607637
)
608638

609-
hs.get_clock().looping_call(
610-
self._delete_old_forward_extrem_cache, 60 * 60 * 1000
611-
)
612-
613-
def _delete_old_forward_extrem_cache(self):
614-
def _delete_old_forward_extrem_cache_txn(txn):
615-
# Delete entries older than a month, while making sure we don't delete
616-
# the only entries for a room.
617-
sql = """
618-
DELETE FROM stream_ordering_to_exterm
619-
WHERE
620-
room_id IN (
621-
SELECT room_id
622-
FROM stream_ordering_to_exterm
623-
WHERE stream_ordering > ?
624-
) AND stream_ordering < ?
625-
"""
626-
txn.execute(
627-
sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago)
628-
)
629-
630-
return run_as_background_process(
631-
"delete_old_forward_extrem_cache",
632-
self.db_pool.runInteraction,
633-
"_delete_old_forward_extrem_cache",
634-
_delete_old_forward_extrem_cache_txn,
635-
)
636-
637639
async def clean_room_for_join(self, room_id):
638640
return await self.db_pool.runInteraction(
639641
"clean_room_for_join", self._clean_room_for_join_txn, room_id

0 commit comments

Comments
 (0)