Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rate limiti joins per-room, accounting for joins created by other servers #13169

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/complement/conf/workers-shared-extra.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ rc_joins:
per_second: 9999
burst_count: 9999

rc_joins_per_room:
per_second: 9999
burst_count: 9999

rc_3pid_validation:
per_second: 1000
burst_count: 1000
Expand Down
12 changes: 12 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```

# Upgrading to v1.63.0

## Changes to the event replication streams

Synapse now includes a flag indicating if an event is an outlier when
replicating it to other workers. This is a forwards- and backwards-incompatible
change: v1.62 and workers cannot process events replicated by v1.63 workers, and
vice versa.

Once all workers are upgraded to v1.63 (or downgraded to v1.62), event
replication will resume as normal.

# Upgrading to v1.62.0

## New signatures for spam checker callbacks
Expand Down
16 changes: 16 additions & 0 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -1380,6 +1380,22 @@ rc_joins:
burst_count: 12
```
---
### `rc_joins_per_room`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding info here about what release this config option will be added in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do.


This option allows for ratelimiting joins to a room based on the number of recent
joins (local or remote) to that room. It is intended to mitigate mass-join spam
waves which target multiple homeservers.

Sensible values for this option are provided by default; most server admins
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
won't need to adjust this setting.

Example configuration:
```yaml
rc_joins_per_room:
per_second: 1
burst_count: 10
```
---
### `rc_3pid_validation`

This option ratelimits how often a user or IP can attempt to validate a 3PID.
Expand Down
100 changes: 88 additions & 12 deletions synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,33 @@ class Ratelimiter:
"""
Ratelimit actions marked by arbitrary keys.

This is a "leaky bucket as a meter". For each key to be tracked there is a bucket
containing some number 0 <= T <= `burst_count` of tokens corresponding to previously
permitted requests for that key. Each bucket starts empty, and gradually leaks
tokens at a rate of `rate_hz`.

Upon an incoming request, we must determine:
- the key that this request falls under (which bucket to inspect), and
- the cost C of this request in tokens.
Then, if there is room in the bucket for C tokens (T + C <= `burst_count`),
the request is permitted and `cost` tokens are added to the bucket.
Otherwise the request is denied, and the bucket continues to hold T tokens.

This means that the limiter enforces an average request frequency of `rate_hz`,
while accumulating a buffer of up to `burst_count` requests which can be consumed
instantaneously.

The tricky bit is the leaking. We do not want to have a periodic process which
leaks every bucket! Instead, we track
- the time point when the bucket was last completely empty, and
- how many tokens have added to the bucket permitted since then.
Then for each incoming request, we can calculate how many tokens have leaked
since this time point, and use that to decide if we should accept or reject the
request.

Note that the source code speaks of "actions" and "burst_count" rather than "tokens"
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved
and a "bucket_size".

Args:
clock: A homeserver clock, for retrieving the current time
rate_hz: The long term number of actions that can be performed in a second.
Expand All @@ -41,14 +68,36 @@ def __init__(
self.burst_count = burst_count
self.store = store

# A ordered dictionary keeping track of actions, when they were last
# performed and how often. Each entry is a mapping from a key of arbitrary type
# to a tuple representing:
# * How many times an action has occurred since a point in time
# * The point in time
# * The rate_hz of this particular entry. This can vary per request
# An ordered dictionary representing the token buckets tracked by this rate
# limiter. Each entry maps a key of arbitrary type to a tuple representing:
# * The number of tokens currently in the bucket,
# * The time point when the bucket was last completely empty, and
# * The rate_hz (leak rate) of this particular bucket.
self.actions: OrderedDict[Hashable, Tuple[float, float, float]] = OrderedDict()

def _get_key(
self, requester: Optional[Requester], key: Optional[Hashable]
) -> Hashable:
"""Use the requester's MXID as a fallback key if no key is provided.

Pulled out so that `can_do_action` and `record_action` are consistent.
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")

key = requester.user.to_string()
return key

def _get_action_counts(
self, key: Hashable, time_now_s: float
) -> Tuple[float, float, float]:
"""Retrieve the action counts, with a fallback representing an empty bucket.

Pulled out so that `can_do_action` and `record_action` are consistent.
"""
return self.actions.get(key, (0.0, time_now_s, 0.0))

async def can_do_action(
self,
requester: Optional[Requester],
Expand Down Expand Up @@ -88,11 +137,7 @@ async def can_do_action(
* The reactor timestamp for when the action can be performed next.
-1 if rate_hz is less than or equal to zero
"""
if key is None:
if not requester:
raise ValueError("Must supply at least one of `requester` or `key`")

key = requester.user.to_string()
key = self._get_key(requester, key)

if requester:
# Disable rate limiting of users belonging to any AS that is configured
Expand Down Expand Up @@ -121,7 +166,7 @@ async def can_do_action(
self._prune_message_counts(time_now_s)

# Check if there is an existing count entry for this key
action_count, time_start, _ = self.actions.get(key, (0.0, time_now_s, 0.0))
action_count, time_start, _ = self._get_action_counts(key, time_now_s)

# Check whether performing another action is allowed
time_delta = time_now_s - time_start
Expand Down Expand Up @@ -164,6 +209,37 @@ async def can_do_action(

return allowed, time_allowed

def record_action(
self,
requester: Optional[Requester],
key: Optional[Hashable] = None,
n_actions: int = 1,
_time_now_s: Optional[float] = None,
) -> None:
"""Record that an action(s) took place, even if they violate the rate limit.

This is useful for tracking the frequency of events that happen across
federation which we still want to impose local rate limits on. For instance, if
we are alice.com monitoring a particular room, we cannot prevent bob.com
from joining users to that room. However, we can track the number of recent
joins in the room and refuse to serve new joins ourselves if there have been too
many in the room across both homeservers.

Args:
requester: The requester that is doing the action, if any.
key: An arbitrary key used to classify an action. Defaults to the
requester's user ID.
n_actions: The number of times the user wants to do this action. If the user
cannot do all of the actions, the user's action count is not incremented
at all.
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
"""
key = self._get_key(requester, key)
time_now_s = _time_now_s if _time_now_s is not None else self.clock.time()
action_count, time_start, rate_hz = self._get_action_counts(key, time_now_s)
self.actions[key] = (action_count + n_actions, time_start, rate_hz)

def _prune_message_counts(self, time_now_s: float) -> None:
"""Remove message count entries that have not exceeded their defined
rate_hz limit
Expand Down
7 changes: 7 additions & 0 deletions synapse/config/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
defaults={"per_second": 0.01, "burst_count": 10},
)

# Track the rate of joins to a given room. If there are too many, temporarily
# prevent local joins and remote joins via this server.
self.rc_joins_per_room = RateLimitConfig(
config.get("rc_joins_per_room", {}),
defaults={"per_second": 1, "burst_count": 10},
)

# Ratelimit cross-user key requests:
# * For local requests this is keyed by the sending device.
# * For requests received over federation this is keyed by the origin.
Expand Down
3 changes: 3 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ def __init__(self, hs: "HomeServer"):
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
burst_count=hs.config.ratelimiting.rc_joins_local.burst_count,
)
# Tracks joins from local users to rooms this server isn't a member of.
# I.e. joins this server makes by requesting /make_join /send_join from
# another server.
self._join_rate_limiter_remote = Ratelimiter(
store=self.store,
clock=self.clock,
Expand Down
18 changes: 18 additions & 0 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def __init__(self, hs: "HomeServer"):

# Called when there are new things to stream over replication
self.replication_callbacks: List[Callable[[], None]] = []
self._new_join_in_room_callbacks: List[Callable[[str, str], None]] = []

self._federation_client = hs.get_federation_http_client()

Expand Down Expand Up @@ -280,6 +281,19 @@ def add_replication_callback(self, cb: Callable[[], None]) -> None:
"""
self.replication_callbacks.append(cb)

def add_new_join_in_room_callback(self, cb: Callable[[str, str], None]) -> None:
"""Add a callback that will be called when a user joins a room.

This only fires on genuine membership changes, e.g. "invite" -> "join".
Membership transitions like "join" -> "join" (for e.g. displayname changes) do
not trigger the callback.

When called, the callback receives two arguments: the event ID and the room ID.
It should *not* return a Deferred - if it needs to do any asynchronous work, a
background thread should be started and wrapped with run_as_background_process.
"""
self._new_join_in_room_callbacks.append(cb)

async def on_new_room_event(
self,
event: EventBase,
Expand Down Expand Up @@ -723,6 +737,10 @@ def notify_replication(self) -> None:
for cb in self.replication_callbacks:
cb()

def notify_user_joined_room(self, event_id: str, room_id: str) -> None:
for cb in self._new_join_in_room_callbacks:
cb(event_id, room_id)

def notify_remote_server_up(self, server: str) -> None:
"""Notify any replication that a remote server has come back up"""
# We call federation_sender directly rather than registering as a
Expand Down
1 change: 1 addition & 0 deletions synapse/replication/tcp/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class EventsStreamEventRow(BaseEventsStreamRow):
relates_to: Optional[str]
membership: Optional[str]
rejected: bool
outlier: bool


@attr.s(slots=True, frozen=True, auto_attribs=True)
Expand Down
22 changes: 14 additions & 8 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1465,7 +1465,7 @@ async def get_room_complexity(self, room_id: str) -> Dict[str, float]:

async def get_all_new_forward_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns new events, for the Events replication stream

Args:
Expand All @@ -1481,10 +1481,11 @@ async def get_all_new_forward_event_rows(

def get_all_new_forward_event_rows(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
Expand All @@ -1498,7 +1499,8 @@ def get_all_new_forward_event_rows(
)
txn.execute(sql, (last_id, current_id, instance_name, limit))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)

return await self.db_pool.runInteraction(
Expand All @@ -1507,7 +1509,7 @@ def get_all_new_forward_event_rows(

async def get_ex_outlier_stream_rows(
self, instance_name: str, last_id: int, current_id: int
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
"""Returns de-outliered events, for the Events replication stream
DMRobertson marked this conversation as resolved.
Show resolved Hide resolved

Args:
Expand All @@ -1522,11 +1524,14 @@ async def get_ex_outlier_stream_rows(

def get_ex_outlier_stream_rows_txn(
txn: LoggingTransaction,
) -> List[Tuple[int, str, str, str, str, str, str, str, str]]:
) -> List[Tuple[int, str, str, str, str, str, str, str, bool, bool]]:
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
" se.state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL,"
" e.outlier"
" FROM events AS e"
# NB: the next line (inner join) is what makes this query different from
# get_all_new_forward_event_rows.
" INNER JOIN ex_outlier_stream AS out USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events AS se USING (event_id)"
Expand All @@ -1541,7 +1546,8 @@ def get_ex_outlier_stream_rows_txn(

txn.execute(sql, (last_id, current_id, instance_name))
return cast(
List[Tuple[int, str, str, str, str, str, str, str, str]], txn.fetchall()
List[Tuple[int, str, str, str, str, str, str, str, bool, bool]],
txn.fetchall(),
)

return await self.db_pool.runInteraction(
Expand Down
Loading