Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Make token serializing/deserializing async (#8427)
Browse files Browse the repository at this point in the history
The idea is that in future tokens will encode a mapping of instance to position. However, we don't want to include the full instance name in the string representation, so instead we'll have a mapping between instance name and an immutable integer ID in the DB that we can use instead. We'll then do the lookup when we serialize/deserialize the token (we could alternatively pass around an `Instance` type that includes both the name and ID, but that turns out to be a lot more invasive).
  • Loading branch information
erikjohnston authored Sep 30, 2020
1 parent a0a1ba6 commit 7941372
Show file tree
Hide file tree
Showing 17 changed files with 115 additions and 59 deletions.
1 change: 1 addition & 0 deletions changelog.d/8427.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make stream token serializing/deserializing async.
4 changes: 2 additions & 2 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ async def get_stream(

chunk = {
"chunk": chunks,
"start": tokens[0].to_string(),
"end": tokens[1].to_string(),
"start": await tokens[0].to_string(self.store),
"end": await tokens[1].to_string(self.store),
}

return chunk
Expand Down
14 changes: 7 additions & 7 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ async def handle_room(event: RoomsForUser):
messages, time_now=time_now, as_client_event=as_client_event
)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
}

d["state"] = await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -249,7 +249,7 @@ async def handle_room(event: RoomsForUser):
],
"account_data": account_data_events,
"receipts": receipt,
"end": now_token.to_string(),
"end": await now_token.to_string(self.store),
}

return ret
Expand Down Expand Up @@ -348,8 +348,8 @@ async def _room_initial_sync_parted(
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
await self._event_serializer.serialize_events(
Expand Down Expand Up @@ -447,8 +447,8 @@ async def get_receipts():
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
"start": start_token.to_string(),
"end": end_token.to_string(),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": state,
"presence": presence,
Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ async def get_messages(
if not events:
return {
"chunk": [],
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

state = None
Expand Down Expand Up @@ -442,8 +442,8 @@ async def get_messages(
events, time_now, as_client_event=as_client_event
)
),
"start": from_token.to_string(),
"end": next_token.to_string(),
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}

if state:
Expand Down
8 changes: 5 additions & 3 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,11 +1077,13 @@ def filter_evts(events):
# the token, which we replace.
token = StreamToken.START

results["start"] = token.copy_and_replace(
results["start"] = await token.copy_and_replace(
"room_key", results["start"]
).to_string()
).to_string(self.store)

results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
results["end"] = await token.copy_and_replace(
"room_key", results["end"]
).to_string(self.store)

return results

Expand Down
8 changes: 4 additions & 4 deletions synapse/handlers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,13 +362,13 @@ async def search(self, user, content, batch=None):
self.storage, user.to_string(), res["events_after"]
)

res["start"] = now_token.copy_and_replace(
res["start"] = await now_token.copy_and_replace(
"room_key", res["start"]
).to_string()
).to_string(self.store)

res["end"] = now_token.copy_and_replace(
res["end"] = await now_token.copy_and_replace(
"room_key", res["end"]
).to_string()
).to_string(self.store)

if include_profile:
senders = {
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async def on_POST(self, request, room_id, event_id):
raise SynapseError(400, "Event is for wrong room.")

room_token = await self.store.get_topological_token_for_event(event_id)
token = str(room_token)
token = await room_token.to_string(self.store)

logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, hs):
super().__init__()
self.event_stream_handler = hs.get_event_stream_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -44,7 +45,7 @@ async def on_GET(self, request):
if b"room_id" in request.args:
room_id = request.args[b"room_id"][0].decode("ascii")

pagin_config = PaginationConfig.from_request(request)
pagin_config = await PaginationConfig.from_request(self.store, request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if b"timeout" in request.args:
try:
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
as_client_event = b"raw" not in request.args
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
include_archived = parse_boolean(request, "archived", default=False)
content = await self.initial_sync_handler.snapshot_all_rooms(
user_id=requester.user.to_string(),
Expand Down
11 changes: 8 additions & 3 deletions synapse/rest/client/v1/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ def __init__(self, hs):
super().__init__()
self.message_handler = hs.get_message_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
Expand All @@ -465,7 +466,7 @@ async def on_GET(self, request, room_id):
if at_token_string is None:
at_token = None
else:
at_token = StreamToken.from_string(at_token_string)
at_token = await StreamToken.from_string(self.store, at_token_string)

# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
Expand Down Expand Up @@ -521,10 +522,13 @@ def __init__(self, hs):
super().__init__()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request, default_limit=10)
pagination_config = await PaginationConfig.from_request(
self.store, request, default_limit=10
)
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, b"filter", encoding="utf-8")
if filter_str:
Expand Down Expand Up @@ -580,10 +584,11 @@ def __init__(self, hs):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
pagination_config = PaginationConfig.from_request(request)
pagination_config = await PaginationConfig.from_request(self.store, request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v2_alpha/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __init__(self, hs):
super().__init__()
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
self.store = hs.get_datastore()

async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
Expand All @@ -191,7 +192,7 @@ async def on_GET(self, request):
# changes after the "to" as well as before.
set_tag("to", parse_string(request, "to"))

from_token = StreamToken.from_string(from_token_string)
from_token = await StreamToken.from_string(self.store, from_token_string)

user_id = requester.user.to_string()

Expand Down
10 changes: 5 additions & 5 deletions synapse/rest/client/v2_alpha/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def __init__(self, hs):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.sync_handler = hs.get_sync_handler()
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
Expand Down Expand Up @@ -151,10 +152,9 @@ async def on_GET(self, request):
device_id=device_id,
)

since_token = None
if since is not None:
since_token = StreamToken.from_string(since)
else:
since_token = None
since_token = await StreamToken.from_string(self.store, since)

# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(user.to_string())
Expand Down Expand Up @@ -236,7 +236,7 @@ async def encode_response(self, time_now, sync_result, access_token_id, filter):
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(),
"next_batch": await sync_result.next_batch.to_string(self.store),
}

@staticmethod
Expand Down Expand Up @@ -413,7 +413,7 @@ def serialize(events):
result = {
"timeline": {
"events": serialized_timeline,
"prev_batch": room.timeline.prev_batch.to_string(),
"prev_batch": await room.timeline.prev_batch.to_string(self.store),
"limited": room.timeline.limited,
},
"state": {"events": serialized_state},
Expand Down
8 changes: 4 additions & 4 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ async def purge_history(
The set of state groups that are referenced by deleted events.
"""

parsed_token = await RoomStreamToken.parse(self, token)

return await self.db_pool.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
token,
parsed_token,
delete_local_events,
)

def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
token = RoomStreamToken.parse(token_str)

def _purge_history_txn(self, txn, room_id, token, delete_local_events):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
Expand Down
9 changes: 5 additions & 4 deletions synapse/streams/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Optional

Expand All @@ -21,6 +20,7 @@
from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.storage.databases.main import DataStore
from synapse.types import StreamToken

logger = logging.getLogger(__name__)
Expand All @@ -39,8 +39,9 @@ class PaginationConfig:
limit = attr.ib(type=Optional[int])

@classmethod
def from_request(
async def from_request(
cls,
store: "DataStore",
request: SynapseRequest,
raise_invalid_params: bool = True,
default_limit: Optional[int] = None,
Expand All @@ -54,13 +55,13 @@ def from_request(
if from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
from_tok = StreamToken.from_string(from_tok)
from_tok = await StreamToken.from_string(store, from_tok)
except Exception:
raise SynapseError(400, "'from' parameter is invalid")

try:
if to_tok:
to_tok = StreamToken.from_string(to_tok)
to_tok = await StreamToken.from_string(store, to_tok)
except Exception:
raise SynapseError(400, "'to' parameter is invalid")

Expand Down
Loading

0 comments on commit 7941372

Please sign in to comment.