Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Do better
Browse files Browse the repository at this point in the history
  • Loading branch information
Mathieu Velten committed Sep 4, 2023
1 parent 45b6be2 commit c496729
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 15 deletions.
9 changes: 3 additions & 6 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,11 @@ async def delete_messages_for_device(
log_kv({"message": "No changes in cache since last check"})
return 0

ROW_ID_LITERAL = "ctid" if isinstance(self.database_engine, PostgresEngine) else "rowid"

def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
sql = (
f"DELETE FROM device_inbox WHERE {ROW_ID_LITERAL} IN ("
f"SELECT {ROW_ID_LITERAL} FROM device_inbox"
" WHERE user_id = ? AND device_id = ?"
" AND stream_id <= ?"
f"DELETE FROM device_inbox WHERE {self.database_engine.row_id_name} IN ("
f"SELECT {self.database_engine.row_id_name} FROM device_inbox"
" WHERE user_id = ? AND device_id = ? AND stream_id <= ?"
)
if limit:
sql += f" LIMIT {limit}"
Expand Down
9 changes: 2 additions & 7 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,6 @@ async def _background_receipts_linearized_unique_index(
receipts."""

def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
if isinstance(self.database_engine, PostgresEngine):
ROW_ID_NAME = "ctid"
else:
ROW_ID_NAME = "rowid"

# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
Expand All @@ -962,7 +957,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
# `stream_id`, we delete by the ctid instead.
for stream_id, room_id, receipt_type, user_id in duplicate_keys:
sql = f"""
SELECT {ROW_ID_NAME}
SELECT {self.database_engine.row_id_name}
FROM receipts_linearized
WHERE
room_id = ? AND
Expand All @@ -982,7 +977,7 @@ def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL AND
{ROW_ID_NAME} != ?
{self.database_engine.row_id_name} != ?
"""
txn.execute(sql, (room_id, receipt_type, user_id, row_id))

Expand Down
6 changes: 6 additions & 0 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ def server_version(self) -> str:
"""Gets a string giving the server version. For example: '3.22.0'"""
...

@property
@abc.abstractmethod
def row_id_name(self) -> str:
"""Gets the literal name representing a row id for this engine."""
...

@abc.abstractmethod
def in_transaction(self, conn: ConnectionType) -> bool:
"""Whether the connection is currently in a transaction."""
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def server_version(self) -> str:
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)

@property
def row_id_name(self) -> str:
return "ctid"

def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY

Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/engines/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ def server_version(self) -> str:
"""Gets a string giving the server version. For example: '3.22.0'."""
return "%i.%i.%i" % sqlite3.sqlite_version_info

@property
def row_id_name(self) -> str:
return "rowid"

def in_transaction(self, conn: sqlite3.Connection) -> bool:
return conn.in_transaction

Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/schema/main/delta/48/group_unique_indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


from synapse.storage.database import LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements

FIX_INDEXES = """
Expand All @@ -37,7 +37,7 @@


def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
rowid = database_engine.row_id_name

# remove duplicates from group_users & group_invites tables
cur.execute(
Expand Down

0 comments on commit c496729

Please sign in to comment.