Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Make handle_new_client_event throws PartialStateConflictError #14665

Merged
merged 14 commits into from
Dec 15, 2022
1 change: 1 addition & 0 deletions changelog.d/14665.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead.
117 changes: 78 additions & 39 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1343,32 +1343,53 @@ async def exchange_third_party_invite(
)

EventValidator().validate_builder(builder)
event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)

event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)
# Try several times, it could fail with PartialStateConflictError
# in send_membership_event, cf comment in except block.
max_retries = 5
for i in range(max_retries):
try:
(
event,
context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)

EventValidator().validate_new(event, self.config)
event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)

# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = self.hs.hostname
EventValidator().validate_new(event, self.config)

try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(event)
except AuthError as e:
logger.warning("Denying new third party invite %r because %s", event, e)
raise e
# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = self.hs.hostname

await self._check_signature(event, context)
try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(
event
)
except AuthError as e:
logger.warning(
"Denying new third party invite %r because %s", event, e
)
raise e

# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context)
await self._check_signature(event, context)

# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context)

break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
else:
destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}

Expand Down Expand Up @@ -1400,28 +1421,46 @@ async def on_exchange_third_party_invite_request(
room_version_obj, event_dict
)

event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)
# Try several times, it could fail with PartialStateConflictError
# in send_membership_event, cf comment in except block.
max_retries = 5
for i in range(max_retries):
try:
(
event,
context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)

try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(event)
except AuthError as e:
logger.warning("Denying third party invite %r because %s", event, e)
raise e
await self._check_signature(event, context)
try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(event)
except AuthError as e:
logger.warning("Denying third party invite %r because %s", event, e)
raise e
await self._check_signature(event, context)

# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(
event.sender
)

# We need to tell the transaction queue to send this out, even
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context)

# We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass

async def add_display_name_to_third_party_invite(
self,
Expand Down
202 changes: 108 additions & 94 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
AuthError,
Codes,
ConsentNotGivenError,
LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
Expand Down Expand Up @@ -999,60 +998,73 @@ async def create_and_send_nonmember_event(
event.internal_metadata.stream_ordering,
)

event, context = await self.create_event(
requester,
event_dict,
txn_id=txn_id,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
outlier=outlier,
historical=historical,
depth=depth,
)
# Try several times, it could fail with PartialStateConflictError
# in handle_new_client_event, cf comment in except block.
max_retries = 5
for i in range(max_retries):
try:
event, context = await self.create_event(
requester,
event_dict,
txn_id=txn_id,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
outlier=outlier,
historical=historical,
depth=depth,
)

assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
event.sender,
)

spam_check_result = await self.spam_checker.check_event_for_spam(event)
if spam_check_result != self.spam_checker.NOT_SPAM:
if isinstance(spam_check_result, tuple):
try:
[code, dict] = spam_check_result
raise SynapseError(
403,
"This message had been rejected as probable spam",
code,
dict,
)
except ValueError:
logger.error(
"Spam-check module returned invalid error value. Expecting [code, dict], got %s",
spam_check_result,
)
spam_check_result = await self.spam_checker.check_event_for_spam(event)
if spam_check_result != self.spam_checker.NOT_SPAM:
if isinstance(spam_check_result, tuple):
try:
[code, dict] = spam_check_result
raise SynapseError(
403,
"This message had been rejected as probable spam",
code,
dict,
)
except ValueError:
logger.error(
"Spam-check module returned invalid error value. Expecting [code, dict], got %s",
spam_check_result,
)

raise SynapseError(
403,
"This message has been rejected as probable spam",
Codes.FORBIDDEN,
)
raise SynapseError(
403,
"This message has been rejected as probable spam",
Codes.FORBIDDEN,
)

# Backwards compatibility: if the return value is not an error code, it
# means the module returned an error message to be included in the
# SynapseError (which is now deprecated).
raise SynapseError(
403,
spam_check_result,
Codes.FORBIDDEN,
# Backwards compatibility: if the return value is not an error code, it
# means the module returned an error message to be included in the
# SynapseError (which is now deprecated).
raise SynapseError(
403,
spam_check_result,
Codes.FORBIDDEN,
)

ev = await self.handle_new_client_event(
requester=requester,
events_and_context=[(event, context)],
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)

ev = await self.handle_new_client_event(
requester=requester,
events_and_context=[(event, context)],
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass

# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
Expand Down Expand Up @@ -1356,7 +1368,7 @@ async def handle_new_client_event(

Raises:
ShadowBanError if the requester has been shadow-banned.
SynapseError(503) if attempting to persist a partial state event in
PartialStateConflictError if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
extra_users = extra_users or []
Expand Down Expand Up @@ -1418,34 +1430,23 @@ async def handle_new_client_event(
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
event, context = events_and_context[0]
try:
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_events,
requester=requester,
events_and_context=events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
).addErrback(
log_failure, "cache_joined_hosts_for_event failed"
),
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_events,
requester=requester,
events_and_context=events_and_context,
ratelimit=ratelimit,
extra_users=extra_users,
),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
except PartialStateConflictError as e:
# The event context needs to be recomputed.
# Turn the error into a 429, as a hint to the client to try again.
logger.info(
"Room %s was un-partial stated while persisting client event.",
event.room_id,
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
),
consumeErrors=True,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
).addErrback(unwrapFirstError)

return result

Expand Down Expand Up @@ -2012,26 +2013,39 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool:
for user_id in members:
requester = create_requester(user_id, authenticated_entity=self.server_name)
try:
event, context = await self.create_event(
requester,
{
"type": EventTypes.Dummy,
"content": {},
"room_id": room_id,
"sender": user_id,
},
)
# Try several times, it could fail with PartialStateConflictError
# in handle_new_client_event, cf comment in except block.
max_retries = 5
for i in range(max_retries):
try:
event, context = await self.create_event(
requester,
{
"type": EventTypes.Dummy,
"content": {},
"room_id": room_id,
"sender": user_id,
},
)

event.internal_metadata.proactively_send = False
event.internal_metadata.proactively_send = False

# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
await self.handle_new_client_event(
requester,
events_and_context=[(event, context)],
ratelimit=False,
ignore_shadow_ban=True,
)
# Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user.
await self.handle_new_client_event(
requester,
events_and_context=[(event, context)],
ratelimit=False,
ignore_shadow_ban=True,
)

break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
return True
except AuthError:
logger.info(
Expand Down
Loading