Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Enable passing typing stream writers as a list. #11237

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11237.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Allow `stream_writers.typing` config to be a list of one worker.
18 changes: 15 additions & 3 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class WriterLocations:

Attributes:
events: The instances that write to the event and backfill streams.
typing: The instance that writes to the typing stream.
typing: The instances that write to the typing stream. Currently
can only be a single instance.
to_device: The instances that write to the to_device stream. Currently
can only be a single instance.
account_data: The instances that write to the account data streams. Currently
Expand All @@ -75,9 +76,15 @@ class WriterLocations:
"""

events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
typing = attr.ib(
default=["master"],
type=List[str],
converter=_instance_to_list_converter,
)
typing = attr.ib(default="master", type=str)
to_device = attr.ib(
default=["master"],
type=List[str],
Expand Down Expand Up @@ -217,6 +224,11 @@ def read_config(self, config, **kwargs):
% (instance, stream)
)

if len(self.writers.typing) != 1:
raise ConfigError(
"Must only specify one instance to handle `typing` messages."
)

if len(self.writers.to_device) != 1:
raise ConfigError(
"Must only specify one instance to handle `to_device` messages."
Expand Down
4 changes: 0 additions & 4 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,10 +1232,6 @@ def register_query_handler(

self.query_handlers[query_type] = handler

def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
"""Register that the EDU handler is on a different instance than master."""
self._edu_type_to_instance[edu_type] = [instance_name]

def register_instances_for_edu(
self, edu_type: str, instance_names: List[str]
) -> None:
Expand Down
6 changes: 3 additions & 3 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ def __init__(self, hs: "HomeServer"):
if hs.should_send_federation():
self.federation = hs.get_federation_sender()

if hs.config.worker.writers.typing != hs.get_instance_name():
hs.get_federation_registry().register_instance_for_edu(
if hs.get_instance_name() not in hs.config.worker.writers.typing:
hs.get_federation_registry().register_instances_for_edu(
"m.typing",
hs.config.worker.writers.typing,
)
Expand Down Expand Up @@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

assert hs.config.worker.writers.typing == hs.get_instance_name()
assert hs.get_instance_name() in hs.config.worker.writers.typing

self.auth = hs.get_auth()
self.notifier = hs.get_notifier()
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def __init__(self, hs: "HomeServer"):
if isinstance(stream, TypingStream):
# Only add TypingStream as a source on the instance in charge of
# typing.
if hs.config.worker.writers.typing == hs.get_instance_name():
if hs.get_instance_name() in hs.config.worker.writers.typing:
self._streams_to_replicate.append(stream)

continue
Expand Down
3 changes: 1 addition & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,7 @@ class TypingStream(Stream):
ROW_TYPE = TypingStreamRow

def __init__(self, hs: "HomeServer"):
writer_instance = hs.config.worker.writers.typing
if writer_instance == hs.get_instance_name():
if hs.get_instance_name() in hs.config.worker.writers.typing:
# On the writer, query the typing handler
typing_writer_handler = hs.get_typing_writer_handler()
update_function: Callable[
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,7 @@ def __init__(self, hs: "HomeServer"):
# If we're not on the typing writer instance we should scream if we get
# requests.
self._is_typing_writer = (
hs.config.worker.writers.typing == hs.get_instance_name()
hs.get_instance_name() in hs.config.worker.writers.typing
)

async def on_PUT(
Expand Down
4 changes: 2 additions & 2 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def get_presence_handler(self) -> BasePresenceHandler:

@cache_in_self
def get_typing_writer_handler(self) -> TypingWriterHandler:
if self.config.worker.writers.typing == self.get_instance_name():
if self.get_instance_name() in self.config.worker.writers.typing:
return TypingWriterHandler(self)
else:
raise Exception("Workers cannot write typing")
Expand All @@ -474,7 +474,7 @@ def get_presence_router(self) -> PresenceRouter:

@cache_in_self
def get_typing_handler(self) -> FollowerTypingHandler:
if self.config.worker.writers.typing == self.get_instance_name():
if self.get_instance_name() in self.config.worker.writers.typing:
# Use get_typing_writer_handler to ensure that we use the same
# cached version.
return self.get_typing_writer_handler()
Expand Down