Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 1c5526b

Browse files
committed
Use execute_batch in more places
1 parent eee6fcf commit 1c5526b

File tree

11 files changed

+25
-31
lines changed

11 files changed

+25
-31
lines changed

synapse/storage/database.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,12 @@ def description(self) -> Any:
262262
return self.txn.description
263263

264264
def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
265+
"""Similar to `executemany`, except `txn.rowcount` will not be correct
266+
afterwards.
267+
268+
More efficient than `executemany` on PostgreSQL
269+
"""
270+
265271
if isinstance(self.database_engine, PostgresEngine):
266272
from psycopg2.extras import execute_batch # type: ignore
267273

synapse/storage/databases/main/devices.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,7 +897,7 @@ def _prune_txn(txn):
897897
DELETE FROM device_lists_outbound_last_success
898898
WHERE destination = ? AND user_id = ?
899899
"""
900-
txn.executemany(sql, ((row[0], row[1]) for row in rows))
900+
txn.execute_batch(sql, ((row[0], row[1]) for row in rows))
901901

902902
logger.info("Pruned %d device list outbound pokes", count)
903903

@@ -1343,7 +1343,7 @@ def _add_device_change_to_stream_txn(
13431343

13441344
# Delete older entries in the table, as we really only care about
13451345
# when the latest change happened.
1346-
txn.executemany(
1346+
txn.execute_batch(
13471347
"""
13481348
DELETE FROM device_lists_stream
13491349
WHERE user_id = ? AND device_id = ? AND stream_id < ?

synapse/storage/databases/main/event_push_actions.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ def _add_push_actions_to_staging_txn(txn):
487487
VALUES (?, ?, ?, ?, ?, ?)
488488
"""
489489

490-
txn.executemany(
490+
txn.execute_batch(
491491
sql,
492492
(
493493
_gen_entry(user_id, actions)
@@ -803,7 +803,7 @@ def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
803803
],
804804
)
805805

806-
txn.executemany(
806+
txn.execute_batch(
807807
"""
808808
UPDATE event_push_summary
809809
SET notif_count = ?, unread_count = ?, stream_ordering = ?

synapse/storage/databases/main/events_bg_updates.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@ async def _background_reindex_fields_sender(self, progress, batch_size):
139139
max_stream_id = progress["max_stream_id_exclusive"]
140140
rows_inserted = progress.get("rows_inserted", 0)
141141

142-
INSERT_CLUMP_SIZE = 1000
143-
144142
def reindex_txn(txn):
145143
sql = (
146144
"SELECT stream_ordering, event_id, json FROM events"
@@ -178,9 +176,7 @@ def reindex_txn(txn):
178176

179177
sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
180178

181-
for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
182-
clump = update_rows[index : index + INSERT_CLUMP_SIZE]
183-
txn.executemany(sql, clump)
179+
txn.execute_batch(sql, update_rows)
184180

185181
progress = {
186182
"target_min_stream_id_inclusive": target_min_stream_id,
@@ -210,8 +206,6 @@ async def _background_reindex_origin_server_ts(self, progress, batch_size):
210206
max_stream_id = progress["max_stream_id_exclusive"]
211207
rows_inserted = progress.get("rows_inserted", 0)
212208

213-
INSERT_CLUMP_SIZE = 1000
214-
215209
def reindex_search_txn(txn):
216210
sql = (
217211
"SELECT stream_ordering, event_id FROM events"
@@ -256,9 +250,7 @@ def reindex_search_txn(txn):
256250

257251
sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
258252

259-
for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
260-
clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
261-
txn.executemany(sql, clump)
253+
txn.execute_batch(sql, rows_to_update)
262254

263255
progress = {
264256
"target_min_stream_id_inclusive": target_min_stream_id,

synapse/storage/databases/main/media_repository.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ def update_cache_txn(txn):
417417
" WHERE media_origin = ? AND media_id = ?"
418418
)
419419

420-
txn.executemany(
420+
txn.execute_batch(
421421
sql,
422422
(
423423
(time_ms, media_origin, media_id)
@@ -430,7 +430,7 @@ def update_cache_txn(txn):
430430
" WHERE media_id = ?"
431431
)
432432

433-
txn.executemany(sql, ((time_ms, media_id) for media_id in local_media))
433+
txn.execute_batch(sql, ((time_ms, media_id) for media_id in local_media))
434434

435435
return await self.db_pool.runInteraction(
436436
"update_cached_last_access_time", update_cache_txn
@@ -557,7 +557,7 @@ async def delete_url_cache(self, media_ids):
557557
sql = "DELETE FROM local_media_repository_url_cache WHERE media_id = ?"
558558

559559
def _delete_url_cache_txn(txn):
560-
txn.executemany(sql, [(media_id,) for media_id in media_ids])
560+
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
561561

562562
return await self.db_pool.runInteraction(
563563
"delete_url_cache", _delete_url_cache_txn
@@ -586,11 +586,11 @@ async def delete_url_cache_media(self, media_ids):
586586
def _delete_url_cache_media_txn(txn):
587587
sql = "DELETE FROM local_media_repository WHERE media_id = ?"
588588

589-
txn.executemany(sql, [(media_id,) for media_id in media_ids])
589+
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
590590

591591
sql = "DELETE FROM local_media_repository_thumbnails WHERE media_id = ?"
592592

593-
txn.executemany(sql, [(media_id,) for media_id in media_ids])
593+
txn.execute_batch(sql, [(media_id,) for media_id in media_ids])
594594

595595
return await self.db_pool.runInteraction(
596596
"delete_url_cache_media", _delete_url_cache_media_txn

synapse/storage/databases/main/purge_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
172172
)
173173

174174
# Update backward extremeties
175-
txn.executemany(
175+
txn.execute_batch(
176176
"INSERT INTO event_backward_extremities (room_id, event_id)"
177177
" VALUES (?, ?)",
178178
[(room_id, event_id) for event_id, in new_backwards_extrems],

synapse/storage/databases/main/registration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,7 +1104,7 @@ def _bg_user_threepids_grandfather_txn(txn):
11041104
FROM user_threepids
11051105
"""
11061106

1107-
txn.executemany(sql, [(id_server,) for id_server in id_servers])
1107+
txn.execute_batch(sql, [(id_server,) for id_server in id_servers])
11081108

11091109
if id_servers:
11101110
await self.db_pool.runInteraction(

synapse/storage/databases/main/roommember.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -873,8 +873,6 @@ async def _background_add_membership_profile(self, progress, batch_size):
873873
"max_stream_id_exclusive", self._stream_order_on_start + 1
874874
)
875875

876-
INSERT_CLUMP_SIZE = 1000
877-
878876
def add_membership_profile_txn(txn):
879877
sql = """
880878
SELECT stream_ordering, event_id, events.room_id, event_json.json
@@ -915,9 +913,7 @@ def add_membership_profile_txn(txn):
915913
UPDATE room_memberships SET display_name = ?, avatar_url = ?
916914
WHERE event_id = ? AND room_id = ?
917915
"""
918-
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
919-
clump = to_update[index : index + INSERT_CLUMP_SIZE]
920-
txn.executemany(to_update_sql, clump)
916+
txn.execute_batch(to_update_sql, to_update)
921917

922918
progress = {
923919
"target_min_stream_id_inclusive": target_min_stream_id,

synapse/storage/databases/main/schema/delta/59/01ignored_user.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
5555
# { "ignored_users": "@someone:example.org": {} }
5656
ignored_users = content.get("ignored_users", {})
5757
if isinstance(ignored_users, dict) and ignored_users:
58-
cur.executemany(insert_sql, [(user_id, u) for u in ignored_users])
58+
cur.execute_batch(insert_sql, [(user_id, u) for u in ignored_users])
5959

6060
# Add indexes after inserting data for efficiency.
6161
logger.info("Adding constraints to ignored_users table")

synapse/storage/databases/main/search.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def store_search_entries_txn(self, txn, entries):
6363
for entry in entries
6464
)
6565

66-
txn.executemany(sql, args)
66+
txn.execute_batch(sql, args)
6767

6868
elif isinstance(self.database_engine, Sqlite3Engine):
6969
sql = (
@@ -75,7 +75,7 @@ def store_search_entries_txn(self, txn, entries):
7575
for entry in entries
7676
)
7777

78-
txn.executemany(sql, args)
78+
txn.execute_batch(sql, args)
7979
else:
8080
# This should be unreachable.
8181
raise Exception("Unrecognized database engine")

0 commit comments

Comments
 (0)