Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Add experimental option to reduce extremities. #5480

Merged
merged 3 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5480.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an EXPERIMENTAL config option to try and periodically clean up extremities by sending dummy events.
6 changes: 6 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ def read_config(self, config):

_check_resource_config(self.listeners)

# An experimental option to try and periodically clean up extremities
# by sending dummy events.
self.cleanup_extremities_with_dummy_events = config.get(
"cleanup_extremities_with_dummy_events", False,
)

def has_tls_listener(self):
return any(l["tls"] for l in self.listeners)

Expand Down
12 changes: 12 additions & 0 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ def is_soft_failed(self):
"""
return getattr(self, "soft_failed", False)

def should_proactively_send(self):
"""Whether the event, if ours, should be sent to other clients and
servers.

This is used for sending dummy events internally. Servers and clients
can still explicitly fetch the event.

Returns:
bool
"""
return getattr(self, "proactively_send", True)


def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties.
Expand Down
3 changes: 3 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def handle_event(event):
if not is_mine and send_on_behalf_of is None:
return

if not event.internal_metadata.should_proactively_send():
return

try:
# Get the state from before the event.
# We need to make sure that this is the state from before
Expand Down
75 changes: 74 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.validator import EventValidator
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
Expand Down Expand Up @@ -261,6 +262,18 @@ def __init__(self, hs):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)

if (
not self.config.worker_app
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
lambda: run_as_background_process(
"send_dummy_events_to_fill_extremities",
self._send_dummy_events_to_fill_extremities
),
5 * 60 * 1000,
)

@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None, require_consent=True):
Expand Down Expand Up @@ -874,3 +887,63 @@ def _bump_active_time(self, user):
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")

@defer.inlineCallbacks
def _send_dummy_events_to_fill_extremities(self):
"""Background task to send dummy events into rooms that have a large
number of extremities
"""

room_ids = yield self.store.get_rooms_with_many_extremities(
min_count=10, limit=5,
)

for room_id in room_ids:
# For each room we need to find a joined member we can use to send
# the dummy event with.

prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)

latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)

members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids,
)

user_id = None
for member in members:
if self.hs.is_mine_id(member):
user_id = member
break

if not user_id:
# We don't have a joined user.
# TODO: We should do something here to stop the room from
# appearing next time.
continue

requester = create_requester(user_id)

event, context = yield self.create_event(
requester,
{
"type": "org.matrix.dummy_event",
"content": {},
"room_id": room_id,
"sender": user_id,
},
prev_events_and_hashes=prev_events_and_hashes,
)

event.internal_metadata.proactively_send = False

yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=False,
)
29 changes: 29 additions & 0 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,35 @@ def get_latest_event_ids_and_hashes_in_room(self, room_id):
room_id,
)

def get_rooms_with_many_extremities(self, min_count, limit):
"""Get the top rooms with at least N extremities.

Args:
min_count (int): The minimum number of extremities
limit (int): The maximum number of rooms to return.

Returns:
Deferred[list]: At most `limit` room IDs that have at least
`min_count` extremities, sorted by extremity count.
"""

def _get_rooms_with_many_extremities_txn(txn):
sql = """
SELECT room_id FROM event_forward_extremities
GROUP BY room_id
HAVING count(*) > ?
ORDER BY count(*) DESC
LIMIT ?
"""

txn.execute(sql, (min_count, limit))
return [room_id for room_id, in txn]

return self.runInteraction(
"get_rooms_with_many_extremities",
_get_rooms_with_many_extremities_txn,
)

@cached(max_entries=5000, iterable=True)
def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol(
Expand Down
41 changes: 41 additions & 0 deletions tests/storage/test_cleanup_extrems.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,44 @@ def test_forked_graph_cleanup(self):
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))


class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["cleanup_extremities_with_dummy_events"] = True
return self.setup_test_homeserver(config=config)

def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.room_creator = homeserver.get_room_creation_handler()

# Create a test user and room
self.user = UserID("alice", "test")
self.requester = Requester(self.user, None, False, None, None)
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]

def test_send_dummy_event(self):
# Create a bushy graph with 50 extremities.

event_id_start = self.create_and_send_event(self.room_id, self.user)

for _ in range(50):
self.create_and_send_event(
self.room_id, self.user, prev_event_ids=[event_id_start]
)

latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(len(latest_event_ids), 50)

# Pump the reactor repeatedly so that the background updates have a
# chance to run.
self.pump(10 * 60)

latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))