Skip to content

Commit b913aaa

Browse files
Sliding sync: Store the per-connection state in the database. (#17599)
Based on #17600 --------- Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
1 parent dab88a7 commit b913aaa

File tree

14 files changed

+695
-117
lines changed

14 files changed

+695
-117
lines changed

changelog.d/17599.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Store sliding sync per-connection state in the database.

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
from synapse.storage.databases.main.search import SearchStore
9999
from synapse.storage.databases.main.session import SessionStore
100100
from synapse.storage.databases.main.signatures import SignatureWorkerStore
101+
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
101102
from synapse.storage.databases.main.state import StateGroupWorkerStore
102103
from synapse.storage.databases.main.stats import StatsStore
103104
from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -159,6 +160,7 @@ class GenericWorkerStore(
159160
SessionStore,
160161
TaskSchedulerWorkerStore,
161162
ExperimentalFeaturesStore,
163+
SlidingSyncStore,
162164
):
163165
# Properties that multiple storage classes define. Tell mypy what the
164166
# expected type is.

synapse/handlers/sliding_sync/__init__.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def __init__(self, hs: "HomeServer"):
100100
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
101101
self.is_mine_id = hs.is_mine_id
102102

103-
self.connection_store = SlidingSyncConnectionStore()
103+
self.connection_store = SlidingSyncConnectionStore(self.store)
104104
self.extensions = SlidingSyncExtensionHandler(hs)
105105
self.room_lists = SlidingSyncRoomLists(hs)
106106

@@ -221,16 +221,11 @@ async def current_sync_for_user(
221221
# amount of time (more with round-trips and re-processing) in the end to
222222
# get everything again.
223223
previous_connection_state = (
224-
await self.connection_store.get_per_connection_state(
224+
await self.connection_store.get_and_clear_connection_positions(
225225
sync_config, from_token
226226
)
227227
)
228228

229-
await self.connection_store.mark_token_seen(
230-
sync_config=sync_config,
231-
from_token=from_token,
232-
)
233-
234229
# Get all of the room IDs that the user should be able to see in the sync
235230
# response
236231
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0

synapse/handlers/sliding_sync/store.py

Lines changed: 35 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
#
1414

1515
import logging
16-
from typing import TYPE_CHECKING, Dict, Optional, Tuple
16+
from typing import TYPE_CHECKING, Optional
1717

1818
import attr
1919

20-
from synapse.api.errors import SlidingSyncUnknownPosition
2120
from synapse.logging.opentracing import trace
21+
from synapse.storage.databases.main import DataStore
2222
from synapse.types import SlidingSyncStreamToken
2323
from synapse.types.handlers.sliding_sync import (
2424
MutablePerConnectionState,
@@ -61,22 +61,9 @@ class SlidingSyncConnectionStore:
6161
to mapping of room ID to `HaveSentRoom`.
6262
"""
6363

64-
# `(user_id, conn_id)` -> `connection_position` -> `PerConnectionState`
65-
_connections: Dict[Tuple[str, str], Dict[int, PerConnectionState]] = attr.Factory(
66-
dict
67-
)
64+
store: "DataStore"
6865

69-
async def is_valid_token(
70-
self, sync_config: SlidingSyncConfig, connection_token: int
71-
) -> bool:
72-
"""Return whether the connection token is valid/recognized"""
73-
if connection_token == 0:
74-
return True
75-
76-
conn_key = self._get_connection_key(sync_config)
77-
return connection_token in self._connections.get(conn_key, {})
78-
79-
async def get_per_connection_state(
66+
async def get_and_clear_connection_positions(
8067
self,
8168
sync_config: SlidingSyncConfig,
8269
from_token: Optional[SlidingSyncStreamToken],
@@ -86,23 +73,21 @@ async def get_per_connection_state(
8673
Raises:
8774
SlidingSyncUnknownPosition if the connection_token is unknown
8875
"""
89-
if from_token is None:
76+
# If this is our first request, there is no previous connection state to fetch out of the database
77+
if from_token is None or from_token.connection_position == 0:
9078
return PerConnectionState()
9179

92-
connection_position = from_token.connection_position
93-
if connection_position == 0:
94-
# Initial sync (request without a `from_token`) starts at `0` so
95-
# there is no existing per-connection state
96-
return PerConnectionState()
97-
98-
conn_key = self._get_connection_key(sync_config)
99-
sync_statuses = self._connections.get(conn_key, {})
100-
connection_state = sync_statuses.get(connection_position)
80+
conn_id = sync_config.conn_id or ""
10181

102-
if connection_state is None:
103-
raise SlidingSyncUnknownPosition()
82+
device_id = sync_config.requester.device_id
83+
assert device_id is not None
10484

105-
return connection_state
85+
return await self.store.get_and_clear_connection_positions(
86+
sync_config.user.to_string(),
87+
device_id,
88+
conn_id,
89+
from_token.connection_position,
90+
)
10691

10792
@trace
10893
async def record_new_state(
@@ -116,85 +101,28 @@ async def record_new_state(
116101
If there are no changes to the state this may return the same token as
117102
the existing per-connection state.
118103
"""
119-
prev_connection_token = 0
120-
if from_token is not None:
121-
prev_connection_token = from_token.connection_position
122-
123104
if not new_connection_state.has_updates():
124-
return prev_connection_token
125-
126-
conn_key = self._get_connection_key(sync_config)
127-
sync_statuses = self._connections.setdefault(conn_key, {})
128-
129-
# Generate a new token, removing any existing entries in that token
130-
# (which can happen if requests get resent).
131-
new_store_token = prev_connection_token + 1
132-
sync_statuses.pop(new_store_token, None)
133-
134-
# We copy the `MutablePerConnectionState` so that the inner `ChainMap`s
135-
# don't grow forever.
136-
sync_statuses[new_store_token] = new_connection_state.copy()
137-
138-
return new_store_token
105+
if from_token is not None:
106+
return from_token.connection_position
107+
else:
108+
return 0
109+
110+
# A from token with a zero connection position means there was no
111+
# previously stored connection state, so we treat a zero the same as
112+
# there being no previous position.
113+
previous_connection_position = None
114+
if from_token is not None and from_token.connection_position != 0:
115+
previous_connection_position = from_token.connection_position
139116

140-
@trace
141-
async def mark_token_seen(
142-
self,
143-
sync_config: SlidingSyncConfig,
144-
from_token: Optional[SlidingSyncStreamToken],
145-
) -> None:
146-
"""We have received a request with the given token, so we can clear out
147-
any other tokens associated with the connection.
148-
149-
If there is no from token then we have started afresh, and so we delete
150-
all tokens associated with the device.
151-
"""
152-
# Clear out any tokens for the connection that doesn't match the one
153-
# from the request.
154-
155-
conn_key = self._get_connection_key(sync_config)
156-
sync_statuses = self._connections.pop(conn_key, {})
157-
if from_token is None:
158-
return
159-
160-
sync_statuses = {
161-
connection_token: room_statuses
162-
for connection_token, room_statuses in sync_statuses.items()
163-
if connection_token == from_token.connection_position
164-
}
165-
if sync_statuses:
166-
self._connections[conn_key] = sync_statuses
167-
168-
@staticmethod
169-
def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
170-
"""Return a unique identifier for this connection.
171-
172-
The first part is simply the user ID.
173-
174-
The second part is generally a combination of device ID and conn_id.
175-
However, both these two are optional (e.g. puppet access tokens don't
176-
have device IDs), so this handles those edge cases.
177-
178-
We use this over the raw `conn_id` to avoid clashes between different
179-
clients that use the same `conn_id`. Imagine a user uses a web client
180-
that uses `conn_id: main_sync_loop` and an Android client that also has
181-
a `conn_id: main_sync_loop`.
182-
"""
183-
184-
user_id = sync_config.user.to_string()
185-
186-
# Only one sliding sync connection is allowed per given conn_id (empty
187-
# or not).
188117
conn_id = sync_config.conn_id or ""
189118

190-
if sync_config.requester.device_id:
191-
return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
192-
193-
if sync_config.requester.access_token_id:
194-
# If we don't have a device, then the access token ID should be a
195-
# stable ID.
196-
return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
119+
device_id = sync_config.requester.device_id
120+
assert device_id is not None
197121

198-
# If we have neither then its likely an AS or some weird token. Either
199-
# way we can just fail here.
200-
raise Exception("Cannot use sliding sync with access token type")
122+
return await self.store.persist_per_connection_state(
123+
sync_config.user.to_string(),
124+
device_id,
125+
conn_id,
126+
previous_connection_position,
127+
new_connection_state,
128+
)

synapse/storage/database.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from synapse.storage.background_updates import BackgroundUpdater
6565
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
6666
from synapse.storage.types import Connection, Cursor, SQLQueryParameters
67+
from synapse.types import StrCollection
6768
from synapse.util.async_helpers import delay_cancellation
6869
from synapse.util.iterutils import batch_iter
6970

@@ -1095,6 +1096,48 @@ def simple_insert_txn(
10951096

10961097
txn.execute(sql, vals)
10971098

1099+
@staticmethod
1100+
def simple_insert_returning_txn(
1101+
txn: LoggingTransaction,
1102+
table: str,
1103+
values: Dict[str, Any],
1104+
returning: StrCollection,
1105+
) -> Tuple[Any, ...]:
1106+
"""Executes a `INSERT INTO... RETURNING...` statement (or equivalent for
1107+
SQLite versions that don't support it).
1108+
"""
1109+
1110+
if txn.database_engine.supports_returning:
1111+
sql = "INSERT INTO %s (%s) VALUES(%s) RETURNING %s" % (
1112+
table,
1113+
", ".join(k for k in values.keys()),
1114+
", ".join("?" for _ in values.keys()),
1115+
", ".join(k for k in returning),
1116+
)
1117+
1118+
txn.execute(sql, list(values.values()))
1119+
row = txn.fetchone()
1120+
assert row is not None
1121+
return row
1122+
else:
1123+
# For old versions of SQLite we do a standard insert and then can
1124+
# use `last_insert_rowid` to get at the row we just inserted
1125+
DatabasePool.simple_insert_txn(
1126+
txn,
1127+
table=table,
1128+
values=values,
1129+
)
1130+
txn.execute("SELECT last_insert_rowid()")
1131+
row = txn.fetchone()
1132+
assert row is not None
1133+
(rowid,) = row
1134+
1135+
row = DatabasePool.simple_select_one_txn(
1136+
txn, table=table, keyvalues={"rowid": rowid}, retcols=returning
1137+
)
1138+
assert row is not None
1139+
return row
1140+
10981141
async def simple_insert_many(
10991142
self,
11001143
table: str,

synapse/storage/databases/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
LoggingDatabaseConnection,
3434
LoggingTransaction,
3535
)
36+
from synapse.storage.databases.main.sliding_sync import SlidingSyncStore
3637
from synapse.storage.databases.main.stats import UserSortOrder
3738
from synapse.storage.engines import BaseDatabaseEngine
3839
from synapse.storage.types import Cursor
@@ -156,6 +157,7 @@ class DataStore(
156157
LockStore,
157158
SessionStore,
158159
TaskSchedulerWorkerStore,
160+
SlidingSyncStore,
159161
):
160162
def __init__(
161163
self,

0 commit comments

Comments
 (0)