Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 5d0b626

Browse files
committed
add suppport for persisting batched events over replication
1 parent 494e167 commit 5d0b626

File tree

3 files changed

+194
-2
lines changed

3 files changed

+194
-2
lines changed

synapse/handlers/message.py

+27-2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
from synapse.logging.context import make_deferred_yieldable, run_in_background
5757
from synapse.metrics.background_process_metrics import run_as_background_process
5858
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
59+
from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
5960
from synapse.storage.databases.main.events import PartialStateConflictError
6061
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
6162
from synapse.storage.state import StateFilter
@@ -494,6 +495,7 @@ def __init__(self, hs: "HomeServer"):
494495
self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
495496

496497
self.send_event = ReplicationSendEventRestServlet.make_client(hs)
498+
self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
497499

498500
self.request_ratelimiter = hs.get_request_ratelimiter()
499501

@@ -1362,7 +1364,7 @@ async def handle_create_room_events(
13621364

13631365
async def _persist_events_batch(
13641366
self,
1365-
requestor: Requester,
1367+
requester: Requester,
13661368
events_and_ctx: List[Tuple[EventBase, EventContext]],
13671369
ratelimit: bool = True,
13681370
) -> EventBase:
@@ -1378,8 +1380,31 @@ async def _persist_events_batch(
13781380
event, context
13791381
)
13801382
try:
1383+
# If we're a worker we need to hit out to the master.
1384+
writer_instance = self._events_shard_config.get_instance(event.room_id)
1385+
if writer_instance != self._instance_name:
1386+
try:
1387+
result = await self.send_events(
1388+
instance_name=writer_instance,
1389+
store=self.store,
1390+
requester=requester,
1391+
events_and_ctx=events_and_ctx,
1392+
ratelimit=ratelimit,
1393+
)
1394+
except SynapseError as e:
1395+
if e.code == HTTPStatus.CONFLICT:
1396+
raise PartialStateConflictError()
1397+
raise
1398+
stream_id = result["stream_id"]
1399+
1400+
# If we newly persisted the event then we need to update its
1401+
# stream_ordering entry manually (as it was persisted on
1402+
# another worker).
1403+
event.internal_metadata.stream_ordering = stream_id
1404+
return event
1405+
13811406
last_event = await self.persist_and_notify_batched_events(
1382-
requestor, events_and_ctx, ratelimit
1407+
requester, events_and_ctx, ratelimit
13831408
)
13841409
except Exception:
13851410
# Ensure that we actually remove the entries in the push actions

synapse/replication/http/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
push,
2626
register,
2727
send_event,
28+
send_events,
2829
state,
2930
streams,
3031
)
@@ -43,6 +44,7 @@ def __init__(self, hs: "HomeServer"):
4344

4445
def register_servlets(self, hs: "HomeServer") -> None:
4546
send_event.register_servlets(hs, self)
47+
send_events.register_servlets(hs, self)
4648
federation.register_servlets(hs, self)
4749
presence.register_servlets(hs, self)
4850
membership.register_servlets(hs, self)
+165
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright 2022 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
from typing import TYPE_CHECKING, List, Tuple
17+
18+
from twisted.web.server import Request
19+
20+
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
21+
from synapse.events import EventBase, make_event_from_dict
22+
from synapse.events.snapshot import EventContext
23+
from synapse.http.server import HttpServer
24+
from synapse.http.servlet import parse_json_object_from_request
25+
from synapse.replication.http._base import ReplicationEndpoint
26+
from synapse.types import JsonDict, Requester
27+
from synapse.util.metrics import Measure
28+
29+
if TYPE_CHECKING:
30+
from synapse.server import HomeServer
31+
from synapse.storage.databases.main import DataStore
32+
33+
logger = logging.getLogger(__name__)
34+
35+
36+
class ReplicationSendEventsRestServlet(ReplicationEndpoint):
37+
"""Handles batches of newly created events on workers, including persisting and
38+
notifying.
39+
40+
The API looks like:
41+
42+
POST /_synapse/replication/send_events/:txn_id
43+
44+
{
45+
"events": [{
46+
"event": { .. serialized event .. },
47+
"room_version": .., // "1", "2", "3", etc: the version of the room
48+
// containing the event
49+
"event_format_version": .., // 1,2,3 etc: the event format version
50+
"internal_metadata": { .. serialized internal_metadata .. },
51+
"outlier": true|false,
52+
"rejected_reason": .., // The event.rejected_reason field
53+
"context": { .. serialized event context .. },
54+
"requester": { .. serialized requester .. },
55+
"ratelimit": true,
56+
}]
57+
}
58+
59+
200 OK
60+
61+
{ "stream_id": 12345, "event_id": "$abcdef..." }
62+
63+
Responds with a 409 when a `PartialStateConflictError` is raised due to an event
64+
context that needs to be recomputed due to the un-partial stating of a room.
65+
66+
"""
67+
68+
NAME = "send_events"
69+
PATH_ARGS = ()
70+
71+
def __init__(self, hs: "HomeServer"):
72+
super().__init__(hs)
73+
74+
self.event_creation_handler = hs.get_event_creation_handler()
75+
self.store = hs.get_datastores().main
76+
self._storage_controllers = hs.get_storage_controllers()
77+
self.clock = hs.get_clock()
78+
79+
@staticmethod
80+
async def _serialize_payload( # type: ignore[override]
81+
store: "DataStore",
82+
events_and_ctx: List[Tuple[EventBase, EventContext]],
83+
requester: Requester,
84+
ratelimit: bool,
85+
) -> JsonDict:
86+
"""
87+
Args:
88+
store
89+
requester
90+
events_and_ctx
91+
ratelimit
92+
"""
93+
serialized_events = []
94+
95+
for event, context in events_and_ctx:
96+
serialized_context = await context.serialize(event, store)
97+
serialized_event = {
98+
"event": event.get_pdu_json(),
99+
"room_version": event.room_version.identifier,
100+
"event_format_version": event.format_version,
101+
"internal_metadata": event.internal_metadata.get_dict(),
102+
"outlier": event.internal_metadata.is_outlier(),
103+
"rejected_reason": event.rejected_reason,
104+
"context": serialized_context,
105+
"requester": requester.serialize(),
106+
"ratelimit": ratelimit,
107+
}
108+
serialized_events.append(serialized_event)
109+
110+
payload = {"events": serialized_events}
111+
112+
return payload
113+
114+
async def _handle_request( # type: ignore[override]
115+
self, request: Request
116+
) -> Tuple[int, JsonDict]:
117+
with Measure(self.clock, "repl_send_events_parse"):
118+
payload = parse_json_object_from_request(request)
119+
events_and_ctx = []
120+
events = payload["events"]
121+
122+
for event_payload in events:
123+
event_dict = event_payload["event"]
124+
room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
125+
internal_metadata = event_payload["internal_metadata"]
126+
rejected_reason = event_payload["rejected_reason"]
127+
128+
event = make_event_from_dict(
129+
event_dict, room_ver, internal_metadata, rejected_reason
130+
)
131+
event.internal_metadata.outlier = event_payload["outlier"]
132+
133+
requester = Requester.deserialize(
134+
self.store, event_payload["requester"]
135+
)
136+
context = EventContext.deserialize(
137+
self._storage_controllers, event_payload["context"]
138+
)
139+
140+
ratelimit = event_payload["ratelimit"]
141+
events_and_ctx.append((event, context))
142+
143+
logger.info(
144+
"Got batch of events to send, last ID of batch is: %s, sending into room: %s",
145+
event.event_id,
146+
event.room_id,
147+
)
148+
149+
last_event = (
150+
await self.event_creation_handler.persist_and_notify_batched_events(
151+
requester, events_and_ctx, ratelimit
152+
)
153+
)
154+
155+
return (
156+
200,
157+
{
158+
"stream_id": last_event.internal_metadata.stream_ordering,
159+
"event_id": last_event.event_id,
160+
},
161+
)
162+
163+
164+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
165+
ReplicationSendEventsRestServlet(hs).register(http_server)

0 commit comments

Comments
 (0)