Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 085aec6

Browse files
committed
Persist pagination sessions to the database.
1 parent 0ace38b commit 085aec6

File tree

7 files changed

+238
-32
lines changed

7 files changed

+238
-32
lines changed

changelog.d/10613.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add pagination to the spaces summary based on updates to [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946).

mypy.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ files =
5454
synapse/storage/databases/main/keys.py,
5555
synapse/storage/databases/main/pusher.py,
5656
synapse/storage/databases/main/registration.py,
57+
synapse/storage/databases/main/room_summary.py,
5758
synapse/storage/databases/main/stream.py,
5859
synapse/storage/databases/main/ui_auth.py,
5960
synapse/storage/database.py,

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@
114114
MonthlyActiveUsersWorkerStore,
115115
)
116116
from synapse.storage.databases.main.presence import PresenceStore
117+
from synapse.storage.databases.main.room_summary import RoomSummaryStore
117118
from synapse.storage.databases.main.search import SearchStore
118119
from synapse.storage.databases.main.stats import StatsStore
119120
from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -250,6 +251,7 @@ class GenericWorkerSlavedStore(
250251
SearchStore,
251252
TransactionWorkerStore,
252253
LockStore,
254+
RoomSummaryStore,
253255
BaseSlavedStore,
254256
):
255257
pass

synapse/handlers/room_summary.py

Lines changed: 22 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,11 @@
2828
Membership,
2929
RoomTypes,
3030
)
31-
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
31+
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
3232
from synapse.events import EventBase
3333
from synapse.events.utils import format_event_for_client_v2
3434
from synapse.types import JsonDict
3535
from synapse.util.caches.response_cache import ResponseCache
36-
from synapse.util.stringutils import random_string
3736

3837
if TYPE_CHECKING:
3938
from synapse.server import HomeServer
@@ -87,12 +86,6 @@ def __init__(self, hs: "HomeServer"):
8786
self._server_name = hs.hostname
8887
self._federation_client = hs.get_federation_client()
8988

90-
# A map of query information to the current pagination state.
91-
#
92-
# TODO Allow for multiple workers to share this data.
93-
# TODO Expire pagination tokens.
94-
self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
95-
9689
# If a user tries to fetch the same page multiple times in quick succession,
9790
# only process the first attempt and return its result to subsequent requests.
9891
self._pagination_response_cache: ResponseCache[
@@ -102,20 +95,12 @@ def __init__(self, hs: "HomeServer"):
10295
"get_room_hierarchy",
10396
)
10497

105-
def _expire_pagination_sessions(self):
98+
async def _expire_pagination_sessions(self):
10699
"""Expire pagination session which are old."""
107100
expire_before = (
108101
self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
109102
)
110-
to_expire = []
111-
112-
for key, value in self._pagination_sessions.items():
113-
if value.creation_time_ms < expire_before:
114-
to_expire.append(key)
115-
116-
for key in to_expire:
117-
logger.debug("Expiring pagination session id %s", key)
118-
del self._pagination_sessions[key]
103+
await self._store.delete_old_room_hierarchy_pagination_sessions(expire_before)
119104

120105
async def get_space_summary(
121106
self,
@@ -327,17 +312,21 @@ async def _get_room_hierarchy(
327312

328313
# If this is continuing a previous session, pull the persisted data.
329314
if from_token:
330-
self._expire_pagination_sessions()
315+
await self._expire_pagination_sessions()
331316

332-
pagination_key = _PaginationKey(
333-
requested_room_id, suggested_only, max_depth, from_token
334-
)
335-
if pagination_key not in self._pagination_sessions:
317+
try:
318+
pagination_session = (
319+
await self._store.get_room_hierarchy_pagination_session(
320+
requested_room_id, suggested_only, max_depth, from_token
321+
)
322+
)
323+
except StoreError:
336324
raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
337325

338326
# Load the previous state.
339-
pagination_session = self._pagination_sessions[pagination_key]
340-
room_queue = pagination_session.room_queue
327+
room_queue = [
328+
_RoomQueueEntry(*fields) for fields in pagination_session.room_queue
329+
]
341330
processed_rooms = pagination_session.processed_rooms
342331
else:
343332
# The queue of rooms to process, the next room is last on the stack.
@@ -456,13 +445,14 @@ async def _get_room_hierarchy(
456445

457446
# If there's additional data, generate a pagination token (and persist state).
458447
if room_queue:
459-
next_batch = random_string(24)
460-
result["next_batch"] = next_batch
461-
pagination_key = _PaginationKey(
462-
requested_room_id, suggested_only, max_depth, next_batch
463-
)
464-
self._pagination_sessions[pagination_key] = _PaginationSession(
465-
self._clock.time_msec(), room_queue, processed_rooms
448+
result[
449+
"next_batch"
450+
] = await self._store.create_room_hierarchy_pagination_session(
451+
requested_room_id,
452+
suggested_only,
453+
max_depth,
454+
[attr.astuple(room_entry) for room_entry in room_queue], # type: ignore[misc]
455+
processed_rooms,
466456
)
467457

468458
return result

synapse/storage/databases/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
from .rejections import RejectionsStore
6262
from .relations import RelationsStore
6363
from .room import RoomStore
64+
from .room_summary import RoomSummaryStore
6465
from .roommember import RoomMemberStore
6566
from .search import SearchStore
6667
from .signatures import SignatureStore
@@ -121,6 +122,7 @@ class DataStore(
121122
ServerMetricsStore,
122123
EventForwardExtremitiesStore,
123124
LockStore,
125+
RoomSummaryStore,
124126
):
125127
def __init__(self, database: DatabasePool, db_conn, hs):
126128
self.hs = hs
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2021 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import json
16+
import logging
17+
from typing import List, Optional, Sequence, Set, Tuple
18+
19+
import attr
20+
21+
import synapse.util.stringutils as stringutils
22+
from synapse.api.errors import StoreError
23+
from synapse.storage._base import SQLBaseStore, db_to_json
24+
from synapse.storage.database import LoggingTransaction
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
@attr.s(slots=True, frozen=True, auto_attribs=True)
30+
class _PaginationSession:
31+
"""The information that is stored for pagination."""
32+
33+
# The queue of rooms which are still to process as packed _RoomQueueEntry tuples.
34+
room_queue: List[Tuple[str, Sequence[str], int]]
35+
# A set of rooms which have been processed.
36+
processed_rooms: Set[str]
37+
38+
39+
class RoomSummaryStore(SQLBaseStore):
40+
"""
41+
Manage user interactive authentication sessions.
42+
"""
43+
44+
async def create_room_hierarchy_pagination_session(
45+
self,
46+
room_id: str,
47+
suggested_only: bool,
48+
max_depth: Optional[int],
49+
room_queue: List[Tuple[str, Sequence[str], int]],
50+
processed_rooms: Set[str],
51+
) -> str:
52+
"""
53+
Creates a new pagination session for the room hierarchy endpoint.
54+
55+
Args:
56+
room_id: The room ID the pagination session is for.
57+
suggested_only: Whether we should only return children with the
58+
"suggested" flag set.
59+
max_depth: The maximum depth in the tree to explore, must be a
60+
non-negative integer.
61+
room_queue:
62+
The queue of rooms which are still to process.
63+
processed_rooms:
64+
A set of rooms which have been processed.
65+
66+
Returns:
67+
The newly created session ID.
68+
69+
Raises:
70+
StoreError if a unique session ID cannot be generated.
71+
"""
72+
pagination_state = json.dumps(
73+
{
74+
"room_queue": room_queue,
75+
"processed_rooms": list(processed_rooms),
76+
}
77+
)
78+
79+
# autogen a session ID and try to create it. We may clash, so just
80+
# try a few times till one goes through, giving up eventually.
81+
attempts = 0
82+
while attempts < 5:
83+
session_id = stringutils.random_string(24)
84+
85+
try:
86+
await self.db_pool.simple_insert(
87+
table="room_hierarchy_pagination_sessions",
88+
values={
89+
"session_id": session_id,
90+
"room_id": room_id,
91+
"suggested_only": suggested_only,
92+
"max_depth": max_depth,
93+
"pagination_state": pagination_state,
94+
"creation_time": self.hs.get_clock().time_msec(),
95+
},
96+
desc="create_room_hierarchy_pagination_session",
97+
)
98+
logger.debug(
99+
"Persisted room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)",
100+
session_id,
101+
room_id,
102+
suggested_only,
103+
max_depth,
104+
)
105+
106+
return session_id
107+
except self.db_pool.engine.module.IntegrityError:
108+
attempts += 1
109+
raise StoreError(500, "Couldn't generate a session ID.")
110+
111+
async def get_room_hierarchy_pagination_session(
112+
self,
113+
room_id: str,
114+
suggested_only: bool,
115+
max_depth: Optional[int],
116+
session_id: str,
117+
) -> _PaginationSession:
118+
"""
119+
Retrieve data stored with set_session_data
120+
121+
Args:
122+
room_id: The room ID the pagination session is for.
123+
suggested_only: Whether we should only return children with the
124+
"suggested" flag set.
125+
max_depth: The maximum depth in the tree to explore, must be a
126+
non-negative integer.
127+
session_id: The pagination session ID.
128+
129+
Raises:
130+
StoreError if the session cannot be found.
131+
"""
132+
logger.debug(
133+
"Fetch room hierarchy pagination session: %s for room %s (suggested: %s, max_depth: %s)",
134+
session_id,
135+
room_id,
136+
suggested_only,
137+
max_depth,
138+
)
139+
result = await self.db_pool.simple_select_one(
140+
table="room_hierarchy_pagination_sessions",
141+
keyvalues={
142+
"session_id": session_id,
143+
"room_id": room_id,
144+
"suggested_only": suggested_only,
145+
},
146+
retcols=(
147+
"max_depth",
148+
"pagination_state",
149+
),
150+
desc="get_room_hierarchy_pagination_sessions",
151+
)
152+
# Check the value of max_depth separately since null != null.
153+
if result["max_depth"] != max_depth:
154+
raise StoreError(404, "No row found (room_hierarchy_pagination_sessions)")
155+
156+
pagination_state = db_to_json(result["pagination_state"])
157+
158+
return _PaginationSession(
159+
room_queue=pagination_state["room_queue"],
160+
processed_rooms=set(pagination_state["processed_rooms"]),
161+
)
162+
163+
async def delete_old_room_hierarchy_pagination_sessions(
164+
self, expiration_time: int
165+
) -> None:
166+
"""
167+
Remove sessions which were last used earlier than the expiration time.
168+
169+
Args:
170+
expiration_time: The latest time that is still considered valid.
171+
This is an epoch time in milliseconds.
172+
173+
"""
174+
await self.db_pool.runInteraction(
175+
"delete_old_room_hierarchy_pagination_sessions",
176+
self._delete_old_room_hierarchy_pagination_sessions_txn,
177+
expiration_time,
178+
)
179+
180+
def _delete_old_room_hierarchy_pagination_sessions_txn(
181+
self, txn: LoggingTransaction, expiration_time: int
182+
):
183+
# Get the expired sessions.
184+
sql = "DELETE FROM room_hierarchy_pagination_sessions WHERE creation_time <= ?"
185+
txn.execute(sql, [expiration_time])
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2021 The Matrix.org Foundation C.I.C.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
CREATE TABLE IF NOT EXISTS room_hierarchy_pagination_sessions(
18+
session_id TEXT NOT NULL, -- The session ID passed to the client.
19+
creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds).
20+
room_id TEXT NOT NULL, -- The room ID of the pagination session.
21+
suggested_only BOOLEAN NOT NULL, -- Whether to only include suggested rooms/spaces.
22+
max_depth int, -- The maximum depth to fetch.
23+
pagination_state TEXT NOT NULL, -- A JSON dictionary of persisted state.
24+
UNIQUE (session_id)
25+
);

0 commit comments

Comments
 (0)