Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Don't pull event from DB when handling replication traffic. (#8669)
Browse files Browse the repository at this point in the history
I was trying to make it so that we didn't have to start a background task when handling RDATA, but that is a bigger job (due to all the code in `generic_worker`). However I still think not pulling the event from the DB may help reduce some DB usage due to replication, even if most workers will simply go and pull that event from the DB later anyway.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
  • Loading branch information
erikjohnston and clokep authored Oct 28, 2020
1 parent aff1eb7 commit a6ea1a9
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 31 deletions.
1 change: 1 addition & 0 deletions changelog.d/8669.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Don't pull event from DB when handling replication traffic.
68 changes: 55 additions & 13 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Union,
)

import attr
from prometheus_client import Counter

from twisted.internet import defer
Expand Down Expand Up @@ -173,6 +174,17 @@ def __bool__(self):
return bool(self.events)


@attr.s(slots=True, frozen=True)
class _PendingRoomEventEntry:
event_pos = attr.ib(type=PersistedEventPosition)
extra_users = attr.ib(type=Collection[UserID])

room_id = attr.ib(type=str)
type = attr.ib(type=str)
state_key = attr.ib(type=Optional[str])
membership = attr.ib(type=Optional[str])


class Notifier:
""" This class is responsible for notifying any listeners when there are
new events available for it.
Expand All @@ -190,9 +202,7 @@ def __init__(self, hs: "synapse.server.HomeServer"):
self.storage = hs.get_storage()
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = (
[]
) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
self.pending_new_room_events = [] # type: List[_PendingRoomEventEntry]

# Called when there are new things to stream over replication
self.replication_callbacks = [] # type: List[Callable[[], None]]
Expand Down Expand Up @@ -255,7 +265,29 @@ def on_new_room_event(
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
""" Used by handlers to inform the notifier something has happened
"""Unwraps event and calls `on_new_room_event_args`.
"""
self.on_new_room_event_args(
event_pos=event_pos,
room_id=event.room_id,
event_type=event.type,
state_key=event.get("state_key"),
membership=event.content.get("membership"),
max_room_stream_token=max_room_stream_token,
extra_users=extra_users,
)

def on_new_room_event_args(
self,
room_id: str,
event_type: str,
state_key: Optional[str],
membership: Optional[str],
event_pos: PersistedEventPosition,
max_room_stream_token: RoomStreamToken,
extra_users: Collection[UserID] = [],
):
"""Used by handlers to inform the notifier something has happened
in the room, room event wise.
This triggers the notifier to wake up any listeners that are
Expand All @@ -266,7 +298,16 @@ def on_new_room_event(
until all previous events have been persisted before notifying
the client streams.
"""
self.pending_new_room_events.append((event_pos, event, extra_users))
self.pending_new_room_events.append(
_PendingRoomEventEntry(
event_pos=event_pos,
extra_users=extra_users,
room_id=room_id,
type=event_type,
state_key=state_key,
membership=membership,
)
)
self._notify_pending_new_room_events(max_room_stream_token)

self.notify_replication()
Expand All @@ -284,18 +325,19 @@ def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken
users = set() # type: Set[UserID]
rooms = set() # type: Set[str]

for event_pos, event, extra_users in pending:
if event_pos.persisted_after(max_room_stream_token):
self.pending_new_room_events.append((event_pos, event, extra_users))
for entry in pending:
if entry.event_pos.persisted_after(max_room_stream_token):
self.pending_new_room_events.append(entry)
else:
if (
event.type == EventTypes.Member
and event.membership == Membership.JOIN
entry.type == EventTypes.Member
and entry.membership == Membership.JOIN
and entry.state_key
):
self._user_joined_room(event.state_key, event.room_id)
self._user_joined_room(entry.state_key, entry.room_id)

users.update(extra_users)
rooms.add(event.room_id)
users.update(entry.extra_users)
rooms.add(entry.room_id)

if users or rooms:
self.on_new_event(
Expand Down
20 changes: 12 additions & 8 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,21 +141,25 @@ async def on_rdata(
if row.type != EventsStreamEventRow.TypeId:
continue
assert isinstance(row, EventsStreamRow)
assert isinstance(row.data, EventsStreamEventRow)

event = await self.store.get_event(
row.data.event_id, allow_rejected=True
)
if event.rejected_reason:
if row.data.rejected:
continue

extra_users = () # type: Tuple[UserID, ...]
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)
if row.data.type == EventTypes.Member and row.data.state_key:
extra_users = (UserID.from_string(row.data.state_key),)

max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users
self.notifier.on_new_room_event_args(
event_pos=event_pos,
max_room_stream_token=max_token,
extra_users=extra_users,
room_id=row.data.room_id,
event_type=row.data.type,
state_key=row.data.state_key,
membership=row.data.membership,
)

# Notify any waiting deferreds. The list is ordered by position so we
Expand Down
21 changes: 13 additions & 8 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
# limitations under the License.
import heapq
from collections.abc import Iterable
from typing import List, Tuple, Type
from typing import TYPE_CHECKING, List, Optional, Tuple, Type

import attr

from ._base import Stream, StreamUpdateResult, Token

if TYPE_CHECKING:
from synapse.server import HomeServer

"""Handling of the 'events' replication stream
This stream contains rows of various types. Each row therefore contains a 'type'
Expand Down Expand Up @@ -81,12 +84,14 @@ def from_data(cls, data):
class EventsStreamEventRow(BaseEventsStreamRow):
TypeId = "ev"

event_id = attr.ib() # str
room_id = attr.ib() # str
type = attr.ib() # str
state_key = attr.ib() # str, optional
redacts = attr.ib() # str, optional
relates_to = attr.ib() # str, optional
event_id = attr.ib(type=str)
room_id = attr.ib(type=str)
type = attr.ib(type=str)
state_key = attr.ib(type=Optional[str])
redacts = attr.ib(type=Optional[str])
relates_to = attr.ib(type=Optional[str])
membership = attr.ib(type=Optional[str])
rejected = attr.ib(type=bool)


@attr.s(slots=True, frozen=True)
Expand All @@ -113,7 +118,7 @@ class EventsStream(Stream):

NAME = "events"

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self._store = hs.get_datastore()
super().__init__(
hs.get_instance_name(),
Expand Down
8 changes: 6 additions & 2 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1117,11 +1117,13 @@ async def get_all_new_forward_event_rows(
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" AND instance_name = ?"
" ORDER BY stream_ordering ASC"
Expand Down Expand Up @@ -1152,12 +1154,14 @@ async def get_ex_outlier_stream_rows(
def get_ex_outlier_stream_rows_txn(txn):
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts, relates_to_id"
" state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" FROM events AS e"
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" LEFT JOIN event_relations USING (event_id)"
" LEFT JOIN room_memberships USING (event_id)"
" LEFT JOIN rejections USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" AND out.instance_name = ?"
Expand Down

0 comments on commit a6ea1a9

Please sign in to comment.