Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 70fe2ba

Browse files
squahtxH-Shay
authored andcommitted
Remove option to skip locking of tables during emulated upserts (#14469)
To perform an emulated upsert into a table safely, we must either: * lock the table, * be the only writer upserting into the table * or rely on another unique index being present. When the 2nd or 3rd cases were applicable, we previously avoided locking the table as an optimization. However, as seen in #14406, it is easy to slip up when adding new schema deltas and corrupt the database. The only time we lock when performing emulated upserts is while waiting for background updates on postgres. On sqlite, we do no locking at all. Let's remove the option to skip locking tables, so that we don't shoot ourselves in the foot again. Signed-off-by: Sean Quah <seanq@matrix.org>
1 parent 5196a1e commit 70fe2ba

File tree

10 files changed

+19
-74
lines changed

10 files changed

+19
-74
lines changed

changelog.d/14469.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove option to skip locking of tables when performing emulated upserts, to avoid a class of bugs in future.

synapse/storage/database.py

Lines changed: 18 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1129,7 +1129,6 @@ async def simple_upsert(
11291129
values: Dict[str, Any],
11301130
insertion_values: Optional[Dict[str, Any]] = None,
11311131
desc: str = "simple_upsert",
1132-
lock: bool = True,
11331132
) -> bool:
11341133
"""Insert a row with values + insertion_values; on conflict, update with values.
11351134
@@ -1154,21 +1153,12 @@ async def simple_upsert(
11541153
requiring that a unique index exist on the column names used to detect a
11551154
conflict (i.e. `keyvalues.keys()`).
11561155
1157-
If there is no such index, we can "emulate" an upsert with a SELECT followed
1158-
by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
1159-
atomicity guarantees that a native upsert can and are very vulnerable to races
1160-
and crashes. Therefore if we wish to upsert without an appropriate unique index,
1161-
we must either:
1162-
1163-
1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
1164-
2. VERY CAREFULLY ensure that we are the only thread and worker which will be
1165-
writing to this table, in which case we can proceed without a lock
1166-
(`lock=False`).
1167-
1168-
Generally speaking, you should use `lock=True`. If the table in question has a
1169-
unique index[*], this class will use a native upsert (which is atomic and so can
1170-
ignore the `lock` argument). Otherwise this class will use an emulated upsert,
1171-
in which case we want the safer option unless we been VERY CAREFUL.
1156+
If there is no such index yet[*], we can "emulate" an upsert with a SELECT
1157+
followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters
1158+
run at the SERIALIZABLE isolation level: we cannot make the same atomicity
1159+
guarantees that a native upsert can and are very vulnerable to races and
1160+
crashes. Therefore to upsert without an appropriate unique index, we acquire a
1161+
table-level lock before the emulated upsert.
11721162
11731163
[*]: Some tables have unique indices added to them in the background. Those
11741164
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
@@ -1189,7 +1179,6 @@ async def simple_upsert(
11891179
values: The nonunique columns and their new values
11901180
insertion_values: additional key/values to use only when inserting
11911181
desc: description of the transaction, for logging and metrics
1192-
lock: True to lock the table when doing the upsert.
11931182
Returns:
11941183
Returns True if a row was inserted or updated (i.e. if `values` is
11951184
not empty then this always returns True)
@@ -1209,7 +1198,6 @@ async def simple_upsert(
12091198
keyvalues,
12101199
values,
12111200
insertion_values,
1212-
lock=lock,
12131201
db_autocommit=autocommit,
12141202
)
12151203
except self.engine.module.IntegrityError as e:
@@ -1232,7 +1220,6 @@ def simple_upsert_txn(
12321220
values: Dict[str, Any],
12331221
insertion_values: Optional[Dict[str, Any]] = None,
12341222
where_clause: Optional[str] = None,
1235-
lock: bool = True,
12361223
) -> bool:
12371224
"""
12381225
Pick the UPSERT method which works best on the platform. Either the
@@ -1245,8 +1232,6 @@ def simple_upsert_txn(
12451232
values: The nonunique columns and their new values
12461233
insertion_values: additional key/values to use only when inserting
12471234
where_clause: An index predicate to apply to the upsert.
1248-
lock: True to lock the table when doing the upsert. Unused when performing
1249-
a native upsert.
12501235
Returns:
12511236
Returns True if a row was inserted or updated (i.e. if `values` is
12521237
not empty then this always returns True)
@@ -1270,7 +1255,6 @@ def simple_upsert_txn(
12701255
values,
12711256
insertion_values=insertion_values,
12721257
where_clause=where_clause,
1273-
lock=lock,
12741258
)
12751259

12761260
def simple_upsert_txn_emulated(
@@ -1291,14 +1275,15 @@ def simple_upsert_txn_emulated(
12911275
insertion_values: additional key/values to use only when inserting
12921276
where_clause: An index predicate to apply to the upsert.
12931277
lock: True to lock the table when doing the upsert.
1278+
Must not be False unless the table has already been locked.
12941279
Returns:
12951280
Returns True if a row was inserted or updated (i.e. if `values` is
12961281
not empty then this always returns True)
12971282
"""
12981283
insertion_values = insertion_values or {}
12991284

1300-
# We need to lock the table :(, unless we're *really* careful
13011285
if lock:
1286+
# We need to lock the table :(
13021287
self.engine.lock_table(txn, table)
13031288

13041289
def _getwhere(key: str) -> str:
@@ -1406,7 +1391,6 @@ async def simple_upsert_many(
14061391
value_names: Collection[str],
14071392
value_values: Collection[Collection[Any]],
14081393
desc: str,
1409-
lock: bool = True,
14101394
) -> None:
14111395
"""
14121396
Upsert, many times.
@@ -1418,8 +1402,6 @@ async def simple_upsert_many(
14181402
value_names: The value column names
14191403
value_values: A list of each row's value column values.
14201404
Ignored if value_names is empty.
1421-
lock: True to lock the table when doing the upsert. Unused when performing
1422-
a native upsert.
14231405
"""
14241406

14251407
# We can autocommit if it safe to upsert
@@ -1433,7 +1415,6 @@ async def simple_upsert_many(
14331415
key_values,
14341416
value_names,
14351417
value_values,
1436-
lock=lock,
14371418
db_autocommit=autocommit,
14381419
)
14391420

@@ -1445,7 +1426,6 @@ def simple_upsert_many_txn(
14451426
key_values: Collection[Iterable[Any]],
14461427
value_names: Collection[str],
14471428
value_values: Iterable[Iterable[Any]],
1448-
lock: bool = True,
14491429
) -> None:
14501430
"""
14511431
Upsert, many times.
@@ -1457,16 +1437,19 @@ def simple_upsert_many_txn(
14571437
value_names: The value column names
14581438
value_values: A list of each row's value column values.
14591439
Ignored if value_names is empty.
1460-
lock: True to lock the table when doing the upsert. Unused when performing
1461-
a native upsert.
14621440
"""
14631441
if table not in self._unsafe_to_upsert_tables:
14641442
return self.simple_upsert_many_txn_native_upsert(
14651443
txn, table, key_names, key_values, value_names, value_values
14661444
)
14671445
else:
14681446
return self.simple_upsert_many_txn_emulated(
1469-
txn, table, key_names, key_values, value_names, value_values, lock=lock
1447+
txn,
1448+
table,
1449+
key_names,
1450+
key_values,
1451+
value_names,
1452+
value_values,
14701453
)
14711454

14721455
def simple_upsert_many_txn_emulated(
@@ -1477,7 +1460,6 @@ def simple_upsert_many_txn_emulated(
14771460
key_values: Collection[Iterable[Any]],
14781461
value_names: Collection[str],
14791462
value_values: Iterable[Iterable[Any]],
1480-
lock: bool = True,
14811463
) -> None:
14821464
"""
14831465
Upsert, many times, but without native UPSERT support or batching.
@@ -1489,18 +1471,16 @@ def simple_upsert_many_txn_emulated(
14891471
value_names: The value column names
14901472
value_values: A list of each row's value column values.
14911473
Ignored if value_names is empty.
1492-
lock: True to lock the table when doing the upsert.
14931474
"""
14941475
# No value columns, therefore make a blank list so that the following
14951476
# zip() works correctly.
14961477
if not value_names:
14971478
value_values = [() for x in range(len(key_values))]
14981479

1499-
if lock:
1500-
# Lock the table just once, to prevent it being done once per row.
1501-
# Note that, according to Postgres' documentation, once obtained,
1502-
# the lock is held for the remainder of the current transaction.
1503-
self.engine.lock_table(txn, "user_ips")
1480+
# Lock the table just once, to prevent it being done once per row.
1481+
# Note that, according to Postgres' documentation, once obtained,
1482+
# the lock is held for the remainder of the current transaction.
1483+
self.engine.lock_table(txn, "user_ips")
15041484

15051485
for keyv, valv in zip(key_values, value_values):
15061486
_keys = {x: y for x, y in zip(key_names, keyv)}

synapse/storage/databases/main/account_data.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -449,9 +449,6 @@ async def add_account_data_to_room(
449449
content_json = json_encoder.encode(content)
450450

451451
async with self._account_data_id_gen.get_next() as next_id:
452-
# no need to lock here as room_account_data has a unique constraint
453-
# on (user_id, room_id, account_data_type) so simple_upsert will
454-
# retry if there is a conflict.
455452
await self.db_pool.simple_upsert(
456453
desc="add_room_account_data",
457454
table="room_account_data",
@@ -461,7 +458,6 @@ async def add_account_data_to_room(
461458
"account_data_type": account_data_type,
462459
},
463460
values={"stream_id": next_id, "content": content_json},
464-
lock=False,
465461
)
466462

467463
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@@ -517,15 +513,11 @@ def _add_account_data_for_user(
517513
) -> None:
518514
content_json = json_encoder.encode(content)
519515

520-
# no need to lock here as account_data has a unique constraint on
521-
# (user_id, account_data_type) so simple_upsert will retry if
522-
# there is a conflict.
523516
self.db_pool.simple_upsert_txn(
524517
txn,
525518
table="account_data",
526519
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
527520
values={"stream_id": next_id, "content": content_json},
528-
lock=False,
529521
)
530522

531523
# Ignored users get denormalized into a separate table as an optimisation.

synapse/storage/databases/main/appservice.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,8 +451,6 @@ async def set_appservice_stream_type_pos(
451451
table="application_services_state",
452452
keyvalues={"as_id": service.id},
453453
values={f"{stream_type}_stream_id": pos},
454-
# no need to lock when emulating upsert: as_id is a unique key
455-
lock=False,
456454
desc="set_appservice_stream_type_pos",
457455
)
458456

synapse/storage/databases/main/devices.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,9 +1744,6 @@ def _update_remote_device_list_cache_entry_txn(
17441744
table="device_lists_remote_cache",
17451745
keyvalues={"user_id": user_id, "device_id": device_id},
17461746
values={"content": json_encoder.encode(content)},
1747-
# we don't need to lock, because we assume we are the only thread
1748-
# updating this user's devices.
1749-
lock=False,
17501747
)
17511748

17521749
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
@@ -1760,9 +1757,6 @@ def _update_remote_device_list_cache_entry_txn(
17601757
table="device_lists_remote_extremeties",
17611758
keyvalues={"user_id": user_id},
17621759
values={"stream_id": stream_id},
1763-
# again, we can assume we are the only thread updating this user's
1764-
# extremity.
1765-
lock=False,
17661760
)
17671761

17681762
async def update_remote_device_list_cache(
@@ -1815,9 +1809,6 @@ def _update_remote_device_list_cache_txn(
18151809
table="device_lists_remote_extremeties",
18161810
keyvalues={"user_id": user_id},
18171811
values={"stream_id": stream_id},
1818-
# we don't need to lock, because we can assume we are the only thread
1819-
# updating this user's extremity.
1820-
lock=False,
18211812
)
18221813

18231814
async def add_device_change_to_streams(

synapse/storage/databases/main/event_federation.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1686,7 +1686,6 @@ async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
16861686
},
16871687
insertion_values={},
16881688
desc="insert_insertion_extremity",
1689-
lock=False,
16901689
)
16911690

16921691
async def insert_received_event_to_staging(

synapse/storage/databases/main/pusher.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -325,14 +325,11 @@ async def get_throttle_params_by_room(
325325
async def set_throttle_params(
326326
self, pusher_id: str, room_id: str, params: ThrottleParams
327327
) -> None:
328-
# no need to lock because `pusher_throttle` has a primary key on
329-
# (pusher, room_id) so simple_upsert will retry
330328
await self.db_pool.simple_upsert(
331329
"pusher_throttle",
332330
{"pusher": pusher_id, "room_id": room_id},
333331
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
334332
desc="set_throttle_params",
335-
lock=False,
336333
)
337334

338335
async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
@@ -589,8 +586,6 @@ async def add_pusher(
589586
device_id: Optional[str] = None,
590587
) -> None:
591588
async with self._pushers_id_gen.get_next() as stream_id:
592-
# no need to lock because `pushers` has a unique key on
593-
# (app_id, pushkey, user_name) so simple_upsert will retry
594589
await self.db_pool.simple_upsert(
595590
table="pushers",
596591
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
@@ -609,7 +604,6 @@ async def add_pusher(
609604
"device_id": device_id,
610605
},
611606
desc="add_pusher",
612-
lock=False,
613607
)
614608

615609
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(

synapse/storage/databases/main/room.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,9 +1847,6 @@ async def upsert_room_on_join(
18471847
"creator": room_creator,
18481848
"has_auth_chain_index": has_auth_chain_index,
18491849
},
1850-
# rooms has a unique constraint on room_id, so no need to lock when doing an
1851-
# emulated upsert.
1852-
lock=False,
18531850
)
18541851

18551852
async def store_partial_state_room(
@@ -1970,9 +1967,6 @@ async def maybe_store_room_on_outlier_membership(
19701967
"creator": "",
19711968
"has_auth_chain_index": has_auth_chain_index,
19721969
},
1973-
# rooms has a unique constraint on room_id, so no need to lock when doing an
1974-
# emulated upsert.
1975-
lock=False,
19761970
)
19771971

19781972
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:

synapse/storage/databases/main/room_batch.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,4 @@ async def store_state_group_id_for_event_id(
4444
table="event_to_state_groups",
4545
keyvalues={"event_id": event_id},
4646
values={"state_group": state_group_id, "event_id": event_id},
47-
# Unique constraint on event_id so we don't have to lock
48-
lock=False,
4947
)

synapse/storage/databases/main/user_directory.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
481481
table="user_directory",
482482
keyvalues={"user_id": user_id},
483483
values={"display_name": display_name, "avatar_url": avatar_url},
484-
lock=False, # We're only inserter
485484
)
486485

487486
if isinstance(self.database_engine, PostgresEngine):
@@ -511,7 +510,6 @@ def _update_profile_in_user_dir_txn(txn: LoggingTransaction) -> None:
511510
table="user_directory_search",
512511
keyvalues={"user_id": user_id},
513512
values={"value": value},
514-
lock=False, # We're only inserter
515513
)
516514
else:
517515
# This should be unreachable.

0 commit comments

Comments
 (0)