Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop getting missing prev_events after we already know their signature is invalid #13816

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
924ae2b
Track when the pulled event signature fails
MadLittleMods Sep 14, 2022
d240aeb
Add changelog
MadLittleMods Sep 14, 2022
cfb4e88
Fix reference
MadLittleMods Sep 14, 2022
9b390a3
Stop getting missing prev_events after we already know their signatur…
MadLittleMods Sep 15, 2022
3d8507d
Merge branch 'develop' into madlittlemods/13700-track-when-event-sign…
MadLittleMods Sep 23, 2022
88a75cf
Use callback pattern to record signature failures
MadLittleMods Sep 23, 2022
d29ac0b
Add docstring
MadLittleMods Sep 24, 2022
14e39ee
Record failure from get_pdu and add test
MadLittleMods Sep 24, 2022
7898371
Be more selective about which errors to care about
MadLittleMods Sep 30, 2022
43f1d1a
Merge branch 'develop' into madlittlemods/13700-track-when-event-sign…
MadLittleMods Sep 30, 2022
e32b901
Merge branch 'madlittlemods/13700-track-when-event-signature-fails' i…
MadLittleMods Sep 30, 2022
83feb1b
Merge branch 'develop' into madlittlemods/13700-track-when-event-sign…
MadLittleMods Oct 1, 2022
7d102e8
Merge branch 'develop' into madlittlemods/13700-track-when-event-sign…
MadLittleMods Oct 1, 2022
81410b6
Merge branch 'madlittlemods/13700-track-when-event-signature-fails' i…
MadLittleMods Oct 1, 2022
958fd3b
Merge branch 'develop' into madlittlemods/13622-13700-stop-getting-st…
MadLittleMods Oct 3, 2022
e24db41
Weird name and add tests
MadLittleMods Oct 4, 2022
f853e78
Add changelog
MadLittleMods Oct 4, 2022
43fb6b8
Bail early and fix lints
MadLittleMods Oct 4, 2022
03f23b7
Add integration test with backfill
MadLittleMods Oct 4, 2022
99d3e79
Fix lints and better message
MadLittleMods Oct 4, 2022
f11f5b5
Fix test description
MadLittleMods Oct 4, 2022
6878faa
Fix test descriptions
MadLittleMods Oct 4, 2022
f3b443d
Scratch debug changes
MadLittleMods Oct 5, 2022
d135d41
Stop cascading backoff errors
MadLittleMods Oct 5, 2022
4effca9
Remove scratch changes
MadLittleMods Oct 5, 2022
e3cc054
Use custom FederationPullAttemptBackoffError
MadLittleMods Oct 5, 2022
74f9e03
Fix test to reflect no more cascade
MadLittleMods Oct 5, 2022
0b900e1
Better comments from what I've gathered
MadLittleMods Oct 5, 2022
3cb2826
Clarify which error in comments
MadLittleMods Oct 5, 2022
5f313df
Add some clarification to the test comment
MadLittleMods Oct 5, 2022
02e6bdd
Merge branch 'develop' into madlittlemods/13622-13700-stop-getting-st…
MadLittleMods Oct 12, 2022
7f4fdd2
Not a SynapseError
MadLittleMods Oct 12, 2022
89ffbcb
Make sure usages of _compute_event_context_with_maybe_missing_prevs h…
MadLittleMods Oct 12, 2022
354f682
Remove double "recently"
MadLittleMods Oct 12, 2022
e0b0447
Do the calculation in Python because it's more clear when we need res…
MadLittleMods Oct 12, 2022
e72c4e5
Use built-in select many function
MadLittleMods Oct 12, 2022
4e08039
No need for txn
MadLittleMods Oct 12, 2022
b060652
Rename function to reflect functionality
MadLittleMods Oct 12, 2022
b1a0c1b
Fix test description to make it accurate
MadLittleMods Oct 12, 2022
bccd802
Use more standard string interpolation with `logger`
MadLittleMods Oct 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13815.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Keep track when an event pulled over federation fails its signature check so we can intelligently back-off in the future.
25 changes: 22 additions & 3 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Awaitable, Callable, Optional

from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
Expand Down Expand Up @@ -58,7 +58,12 @@ def __init__(self, hs: "HomeServer"):

@trace
async def _check_sigs_and_hash(
self, room_version: RoomVersion, pdu: EventBase
self,
room_version: RoomVersion,
pdu: EventBase,
record_failure_callback: Optional[
Callable[[EventBase, str], Awaitable[None]]
] = None,
) -> EventBase:
"""Checks that event is correctly signed by the sending server.

Expand All @@ -70,6 +75,11 @@ async def _check_sigs_and_hash(
Args:
room_version: The room version of the PDU
pdu: the event to be checked
record_failure_callback: A callback to run whenever the given event
fails signature or hash checks. This includes exceptions
that would be normally be thrown/raised but also things like
checking for event tampering where we just return the redacted
event.

Returns:
* the original event if the checks pass
Expand All @@ -80,7 +90,12 @@ async def _check_sigs_and_hash(
InvalidEventSignatureError if the signature check failed. Nothing
will be logged in this case.
"""
await _check_sigs_on_pdu(self.keyring, room_version, pdu)
try:
await _check_sigs_on_pdu(self.keyring, room_version, pdu)
except InvalidEventSignatureError as exc:
if record_failure_callback:
await record_failure_callback(pdu, str(exc))
raise exc

if not check_event_content_hash(pdu):
# let's try to distinguish between failures because the event was
Expand Down Expand Up @@ -116,6 +131,10 @@ async def _check_sigs_and_hash(
"event_id": pdu.event_id,
}
)
if record_failure_callback:
await record_failure_callback(
pdu, "Event content has been tampered with"
)
return redacted_event

spam_check = await self.spam_checker.check_event_for_spam(pdu)
Expand Down
50 changes: 40 additions & 10 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ async def backfill(
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]

# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_and_fetch(
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
dest, pdus, room_version=room_version
)

Expand Down Expand Up @@ -328,7 +328,17 @@ async def get_pdu_from_destination_raw(

# Check signatures are correct.
try:
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)

async def _record_failure_callback(
event: EventBase, cause: str
) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)

signed_pdu = await self._check_sigs_and_hash(
room_version, pdu, _record_failure_callback
)
except InvalidEventSignatureError as e:
errmsg = f"event id {pdu.event_id}: {e}"
logger.warning("%s", errmsg)
Expand Down Expand Up @@ -547,24 +557,28 @@ async def get_room_state(
len(auth_event_map),
)

valid_auth_events = await self._check_sigs_and_hash_and_fetch(
valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_event_map.values(), room_version
)

valid_state_events = await self._check_sigs_and_hash_and_fetch(
destination, state_event_map.values(), room_version
valid_state_events = (
await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, state_event_map.values(), room_version
)
)

return valid_state_events, valid_auth_events

@trace
async def _check_sigs_and_hash_and_fetch(
async def _check_sigs_and_hash_for_pulled_events_and_fetch(
self,
origin: str,
pdus: Collection[EventBase],
room_version: RoomVersion,
) -> List[EventBase]:
"""Checks the signatures and hashes of a list of events.
"""
Checks the signatures and hashes of a list of pulled events we got from
federation and records any signature failures as failed pull attempts.

If a PDU fails its signature check then we check if we have it in
the database, and if not then request it from the sender's server (if that
Expand Down Expand Up @@ -597,11 +611,17 @@ async def _check_sigs_and_hash_and_fetch(

valid_pdus: List[EventBase] = []

async def _record_failure_callback(event: EventBase, cause: str) -> None:
await self.store.record_event_failed_pull_attempt(
event.room_id, event.event_id, cause
)

async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
pdu=pdu,
origin=origin,
room_version=room_version,
record_failure_callback=_record_failure_callback,
)

if valid_pdu:
Expand All @@ -618,6 +638,9 @@ async def _check_sigs_and_hash_and_fetch_one(
pdu: EventBase,
origin: str,
room_version: RoomVersion,
record_failure_callback: Optional[
Callable[[EventBase, str], Awaitable[None]]
] = None,
) -> Optional[EventBase]:
"""Takes a PDU and checks its signatures and hashes.

Expand All @@ -634,14 +657,21 @@ async def _check_sigs_and_hash_and_fetch_one(
origin
pdu
room_version
record_failure_callback: A callback to run whenever the given event
fails signature or hash checks. This includes exceptions
that would be normally be thrown/raised but also things like
checking for event tampering where we just return the redacted
event.

Returns:
The PDU (possibly redacted) if it has valid signatures and hashes.
None if no valid copy could be found.
"""

try:
return await self._check_sigs_and_hash(room_version, pdu)
return await self._check_sigs_and_hash(
room_version, pdu, record_failure_callback
)
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
Expand Down Expand Up @@ -694,7 +724,7 @@ async def get_event_auth(

auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]

signed_auth = await self._check_sigs_and_hash_and_fetch(
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
)

Expand Down Expand Up @@ -1401,7 +1431,7 @@ async def get_missing_events(
event_from_pdu_json(e, room_version) for e in content.get("events", [])
]

signed_events = await self._check_sigs_and_hash_and_fetch(
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version
)
except HttpResponseException as e:
Expand Down
6 changes: 6 additions & 0 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,12 @@ async def _compute_event_context_with_maybe_missing_prevs(
seen = await self._store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen

# Filter out events we've tried to pull recently
prevs_to_ignore = await self.store.filter_events_with_pull_attempt_backoff(
room_id, missing_prevs
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
missing_prevs = missing_prevs - prevs_to_ignore
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

if not missing_prevs:
return await self._state_handler.compute_event_context(event)

Expand Down
86 changes: 79 additions & 7 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
cast,
Expand Down Expand Up @@ -73,14 +74,13 @@

logger = logging.getLogger(__name__)

BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
datetime.timedelta(days=7).total_seconds()
)
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
datetime.timedelta(hours=1).total_seconds()
)


# All the info we need while iterating the DAG while backfilling
@attr.s(frozen=True, slots=True, auto_attribs=True)
class BackfillQueueNavigationItem:
Expand Down Expand Up @@ -851,8 +851,8 @@ def get_backfill_points_in_room_txn(
False,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
limit,
),
)
Expand Down Expand Up @@ -967,8 +967,8 @@ def get_insertion_event_backward_extremities_in_room_txn(
room_id,
current_depth,
self._clock.time_msec(),
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
limit,
),
)
Expand Down Expand Up @@ -1506,6 +1506,78 @@ def _record_event_failed_pull_attempt_upsert_txn(

txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))

@trace
async def filter_events_with_pull_attempt_backoff(
self,
room_id: str,
event_ids: Sequence[str],
) -> List[str]:
"""
Filter out events that we've failed to pull before
recently. Uses exponential backoff.

Args:
room_id: The room that the events belong to
event_ids: A list of events to filter down

Returns:
List of event_ids that can be attempted to be pulled
"""
return await self.db_pool.runInteraction(
"filter_events_with_pull_attempt_backoff",
self._filter_events_with_pull_attempt_backoff_txn,
room_id,
event_ids,
)

def _filter_events_with_pull_attempt_backoff_txn(
self,
txn: LoggingTransaction,
room_id: str,
event_ids: Sequence[str],
) -> None:
where_event_ids_match_clause, values = make_in_list_sql_clause(
txn.database_engine, "event_id", event_ids
)

sql = """
SELECT event_id FROM event_failed_pull_attempts
WHERE
room_id = ?
%s /* where_event_ids_match_clause */
/**
* Exponential back-off (up to the upper bound) so we don't try to
* pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
*
* We use `1 << n` as a power of 2 equivalent for compatibility
* with older SQLites. The left shift equivalent only works with
* powers of 2 because left shift is a binary operation (base-2).
* Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
*/
AND (
event_id IS NULL
OR ? /* current_time */ >= last_attempt_ts + /*least*/%s((1 << num_attempts) * ? /* step */, ? /* upper bound */)
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
"""

if isinstance(self.database_engine, PostgresEngine):
least_function = "least"
elif isinstance(self.database_engine, Sqlite3Engine):
least_function = "min"
else:
raise RuntimeError("Unknown database engine")

txn.execute(
sql % (where_event_ids_match_clause, least_function),
(
room_id,
*values,
self._clock.time_msec(),
1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
),
)

async def get_missing_events(
self,
room_id: str,
Expand Down
Loading