Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prepatory work for batching events to send #13487

Merged
merged 8 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13487.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Batch up events to be bulk persisted when creating a new room.
58 changes: 47 additions & 11 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,17 @@ async def create_event(
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
state_map: Optional[Mapping[Tuple[str, str], str]] = None,
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
for_batch: bool = False,
current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Given a dict from a client, create a new event.
Given a dict from a client, create a new event. If bool for_batch is true, will
create an event using the prev_event_ids, and will create an event context for
the event using the parameters state_map current_state_group, thus these parameters
must be provided in this case if for_batch is True. The subsequent created event
and context are suitable for being batched up and bulk persisted to the database
with other similarly created events.

Creates an FrozenEvent object, filling out auth_events, prev_events,
etc.
Expand Down Expand Up @@ -612,16 +620,27 @@ async def create_event(
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.

historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.

depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.

state_map: A state map of previously created events, used only when creating events
for batch persisting

for_batch: whether the event is being created for batch persisting to the db

current_state_group: the current state group, used only for creating events for
batch persisting

Raises:
ResourceLimitError if server is blocked to some resource being
exceeded

Returns:
Tuple of created event, Context
"""
Expand Down Expand Up @@ -685,23 +704,40 @@ async def create_event(

builder.internal_metadata.historical = historical

event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
)
if for_batch:
assert prev_event_ids is not None
assert state_map is not None
assert current_state_group is not None
auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
event = await builder.build(
prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
)
context = await self.state.compute_event_context_for_batched(
event, state_map, current_state_group
)
H-Shay marked this conversation as resolved.
Show resolved Hide resolved

else:
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
)

# In an ideal world we wouldn't need the second part of this condition. However,
# this behaviour isn't spec'd yet, meaning we should be able to deactivate this
# behaviour. Another reason is that this code is also evaluated each time a new
# m.room.aliases event is created, which includes hitting a /directory route.
# Therefore not including this condition here would render the similar one in
# synapse.handlers.directory pointless.
if builder.type == EventTypes.Aliases and self.require_membership_for_aliases:
if (
builder.type == EventTypes.Aliases
and self.require_membership_for_aliases
and not for_batch
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
):
# Ideally we'd do the membership check in event_auth.check(), which
# describes a spec'd algorithm for authenticating events received over
# federation as well as those created locally. As of room v3, aliases events
Expand Down
155 changes: 109 additions & 46 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ async def create_room(

if (
self._server_notices_mxid is not None
and requester.user.to_string() == self._server_notices_mxid
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
and user_id == self._server_notices_mxid
):
# allow the server notices mxid to create rooms
is_requester_admin = True
Expand Down Expand Up @@ -1042,7 +1042,9 @@ async def _send_events_for_new_room(
creator_join_profile: Optional[JsonDict] = None,
ratelimit: bool = True,
) -> Tuple[int, str, int]:
"""Sends the initial events into a new room.
"""Sends the initial events into a new room. Sends the room creation, membership,
and power level events into the room sequentially, then creates and batches up the
rest of the events to persist as a batch to the DB.

`power_level_content_override` doesn't apply when initial state has
power level state event content.
Expand All @@ -1053,46 +1055,74 @@ async def _send_events_for_new_room(
"""

creator_id = creator.user.to_string()

event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}

depth = 1
# the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None

def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
# the most recently created event
prev_event: List[str] = []
# a map of event types, state keys -> event_ids. We collect these mappings this as events are
# created (but not persisted to the db) to determine state for future created events
# (as this info can't be pulled from the db)
state_map: dict = {}
H-Shay marked this conversation as resolved.
Show resolved Hide resolved
# current_state_group of last event created. Used for computing event context of
# events to be batched
current_state_group = None

def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}

e.update(event_keys)
e.update(kwargs)

return e

async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
nonlocal last_sent_event_id
async def create_event(
etype: str,
content: JsonDict,
for_batch: bool,
**kwargs: Any,
) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
nonlocal depth
nonlocal prev_event

event = create(etype, content, **kwargs)
logger.debug("Sending %s in new room", etype)
# Allow these events to be sent even if the user is shadow-banned to
# allow the room creation to complete.
(
sent_event,
last_stream_id,
) = await self.event_creation_handler.create_and_send_nonmember_event(
event_dict = create_event_dict(etype, content, **kwargs)

new_event, new_context = await self.event_creation_handler.create_event(
creator,
event,
event_dict,
prev_event_ids=prev_event,
depth=depth,
state_map=state_map,
for_batch=for_batch,
current_state_group=current_state_group,
)
depth += 1
prev_event = [new_event.event_id]
state_map[(new_event.type, new_event.state_key)] = new_event.event_id

return new_event, new_context

async def send(
event: EventBase,
context: synapse.events.snapshot.EventContext,
creator: Requester,
) -> int:
nonlocal last_sent_event_id

ev = await self.event_creation_handler.handle_new_client_event(
requester=creator,
event=event,
context=context,
ratelimit=False,
ignore_shadow_ban=True,
# Note: we don't pass state_event_ids here because this triggers
# an additional query per event to look them up from the events table.
prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
depth=depth,
)

last_sent_event_id = sent_event.event_id
depth += 1
last_sent_event_id = ev.event_id

return last_stream_id
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
return ev.internal_metadata.stream_ordering

try:
config = self._presets_dict[preset_config]
Expand All @@ -1102,9 +1132,13 @@ async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
)

creation_content.update({"creator": creator_id})
await send(etype=EventTypes.Create, content=creation_content)
creation_event, creation_context = await create_event(
EventTypes.Create, creation_content, False
)

logger.debug("Sending %s in new room", EventTypes.Member)
await send(creation_event, creation_context, creator)

# Room create event must exist at this point
assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership(
Expand All @@ -1119,14 +1153,22 @@ async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
depth=depth,
)
last_sent_event_id = member_event_id
prev_event = [member_event_id]

# update the depth and state map here as the membership event has been created
# through a different code path
depth += 1
state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id

# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
last_sent_stream_id = await send(
etype=EventTypes.PowerLevels, content=pl_content
power_event, power_context = await create_event(
EventTypes.PowerLevels, pl_content, False
)
current_state_group = power_context._state_group
last_sent_stream_id = await send(power_event, power_context, creator)
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
Expand Down Expand Up @@ -1169,47 +1211,68 @@ async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
# apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)

last_sent_stream_id = await send(
etype=EventTypes.PowerLevels, content=power_level_content
pl_event, pl_context = await create_event(
EventTypes.PowerLevels,
power_level_content,
False,
)
current_state_group = pl_context._state_group
last_sent_stream_id = await send(pl_event, pl_context, creator)

events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()},
room_alias_event, room_alias_context = await create_event(
EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
)
current_state_group = room_alias_context._state_group
events_to_send.append((room_alias_event, room_alias_context))

if (EventTypes.JoinRules, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
join_rules_event, join_rules_context = await create_event(
EventTypes.JoinRules,
{"join_rule": config["join_rules"]},
True,
)
current_state_group = join_rules_context._state_group
events_to_send.append((join_rules_event, join_rules_context))

if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]},
visibility_event, visibility_context = await create_event(
EventTypes.RoomHistoryVisibility,
{"history_visibility": config["history_visibility"]},
True,
)
current_state_group = visibility_context._state_group
events_to_send.append((visibility_event, visibility_context))

if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
last_sent_stream_id = await send(
etype=EventTypes.GuestAccess,
content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
guest_access_event, guest_access_context = await create_event(
EventTypes.GuestAccess,
{EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
True,
)
current_state_group = guest_access_context._state_group
events_to_send.append((guest_access_event, guest_access_context))

for (etype, state_key), content in initial_state.items():
last_sent_stream_id = await send(
etype=etype, state_key=state_key, content=content
event, context = await create_event(
etype, content, True, state_key=state_key
)
current_state_group = context._state_group
events_to_send.append((event, context))

if config["encrypted"]:
last_sent_stream_id = await send(
etype=EventTypes.RoomEncryption,
encryption_event, encryption_context = await create_event(
EventTypes.RoomEncryption,
{"algorithm": RoomEncryptionAlgorithms.DEFAULT},
True,
state_key="",
content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
)
events_to_send.append((encryption_event, encryption_context))

for event, context in events_to_send:
last_sent_stream_id = await send(event, context, creator)
return last_sent_stream_id, last_sent_event_id, depth

def _generate_room_id(self) -> str:
Expand Down
Loading