Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 8bd3ae7

Browse files
committed
Move autocommit functionality to database class
1 parent 2242bec commit 8bd3ae7

File tree

5 files changed

+50
-20
lines changed

5 files changed

+50
-20
lines changed

synapse/storage/database.py

+30-6
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,12 @@ def new_transaction(
566566
sql_txn_timer.labels(desc).observe(duration)
567567

568568
async def runInteraction(
569-
self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
569+
self,
570+
desc: str,
571+
func: "Callable[..., R]",
572+
*args: Any,
573+
db_autocommit: bool = False,
574+
**kwargs: Any
570575
) -> R:
571576
"""Starts a transaction on the database and runs a given function
572577
@@ -576,6 +581,10 @@ async def runInteraction(
576581
database transaction (twisted.enterprise.adbapi.Transaction) as
577582
its first argument, followed by `args` and `kwargs`.
578583
584+
db_autocommit: Whether to run the function in "autocommit" mode,
585+
i.e. outside of a transaction. This is useful for transaction
586+
that are only a single query. Currently only affects postgres.
587+
579588
args: positional args to pass to `func`
580589
kwargs: named args to pass to `func`
581590
@@ -596,6 +605,7 @@ async def runInteraction(
596605
exception_callbacks,
597606
func,
598607
*args,
608+
db_autocommit=db_autocommit,
599609
**kwargs
600610
)
601611

@@ -609,7 +619,11 @@ async def runInteraction(
609619
return cast(R, result)
610620

611621
async def runWithConnection(
612-
self, func: "Callable[..., R]", *args: Any, **kwargs: Any
622+
self,
623+
func: "Callable[..., R]",
624+
*args: Any,
625+
db_autocommit: bool = False,
626+
**kwargs: Any
613627
) -> R:
614628
"""Wraps the .runWithConnection() method on the underlying db_pool.
615629
@@ -618,6 +632,9 @@ async def runWithConnection(
618632
database connection (twisted.enterprise.adbapi.Connection) as
619633
its first argument, followed by `args` and `kwargs`.
620634
args: positional args to pass to `func`
635+
db_autocommit: Whether to run the function in "autocommit" mode,
636+
i.e. outside of a transaction. This is useful for transaction
637+
that are only a single query. Currently only affects postgres.
621638
kwargs: named args to pass to `func`
622639
623640
Returns:
@@ -649,10 +666,17 @@ def inner_func(conn, *args, **kwargs):
649666
logger.debug("Reconnecting closed database connection")
650667
conn.reconnect()
651668

652-
db_conn = LoggingDatabaseConnection(
653-
conn, self.engine, "runWithConnection"
654-
)
655-
return func(db_conn, *args, **kwargs)
669+
try:
670+
if db_autocommit:
671+
self.engine.set_autocommit(conn, True)
672+
673+
db_conn = LoggingDatabaseConnection(
674+
conn, self.engine, "runWithConnection"
675+
)
676+
return func(db_conn, *args, **kwargs)
677+
finally:
678+
if db_autocommit:
679+
self.engine.set_autocommit(conn, False)
656680

657681
return await make_deferred_yieldable(
658682
self._db_pool.runWithConnection(inner_func, *args, **kwargs)

synapse/storage/engines/_base.py

+8
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,11 @@ def in_transaction(self, conn: Connection) -> bool:
103103
"""Whether the connection is currently in a transaction.
104104
"""
105105
...
106+
107+
@abc.abstractmethod
108+
def set_autocommit(self, conn: Connection, autocommit: bool):
109+
"""Set the connections autocommit mode.
110+
111+
When True queries are run outside of transactions.
112+
"""
113+
...

synapse/storage/engines/postgres.py

+3
Original file line numberDiff line numberDiff line change
@@ -177,3 +177,6 @@ def server_version(self):
177177

178178
def in_transaction(self, conn: Connection) -> bool:
179179
return conn.status != self.module.extensions.STATUS_READY # type: ignore
180+
181+
def set_autocommit(self, conn: Connection, autocommit: bool):
182+
return conn.set_session(autocommit=autocommit) # type: ignore

synapse/storage/engines/sqlite.py

+5
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ def server_version(self):
110110
def in_transaction(self, conn: Connection) -> bool:
111111
return conn.in_transaction # type: ignore
112112

113+
def set_autocommit(self, conn: Connection, autocommit: bool):
114+
# Twisted doesn't let us set attributes on the connections, so we can't
115+
# set the connection to autocommit mode.
116+
pass
117+
113118

114119
# Following functions taken from: https://github.com/coleifer/peewee
115120

synapse/storage/util/id_generators.py

+4-14
Original file line numberDiff line numberDiff line change
@@ -574,18 +574,6 @@ def _update_stream_positions_table_txn(self, txn: LoggingTransaction):
574574
pos = (self.get_current_token_for_writer(self._instance_name),)
575575
txn.execute(sql, (self._stream_name, self._instance_name, pos))
576576

577-
def _update_stream_positions_table_conn(self, conn: LoggingDatabaseConnection):
578-
# We use autocommit/read committed here so that we don't have to go
579-
# through a transaction dance, which a) adds latency and b) runs the
580-
# risk of serialization errors.
581-
try:
582-
conn.conn.set_session(autocommit=True) # type: ignore
583-
584-
with conn.cursor(txn_name="MultiWriterIdGenerator._update_table") as cur:
585-
self._update_stream_positions_table_txn(cur)
586-
finally:
587-
conn.conn.set_session(autocommit=False) # type: ignore
588-
589577

590578
@attr.s(slots=True)
591579
class _AsyncCtxManagerWrapper:
@@ -649,8 +637,10 @@ async def __aexit__(self, exc_type, exc, tb):
649637
# We only do this on the success path so that the persisted current
650638
# position points to a persisted row with the correct instance name.
651639
if self.id_gen._writers:
652-
await self.id_gen._db.runWithConnection(
653-
self.id_gen._update_stream_positions_table_conn,
640+
await self.id_gen._db.runInteraction(
641+
"MultiWriterIdGenerator._update_table",
642+
self.id_gen._update_stream_positions_table_txn,
643+
db_autocommit=True,
654644
)
655645

656646
return False

0 commit comments

Comments
 (0)