Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add some metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Nov 21, 2016
1 parent 50934ce commit 88d85eb
Showing 1 changed file with 68 additions and 48 deletions.
116 changes: 68 additions & 48 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,16 @@

from .units import Edu

from synapse.util.metrics import Measure
import synapse.metrics

from blist import sorteddict
import ujson


metrics = synapse.metrics.get_metrics_for(__name__)


PRESENCE_TYPE = "p"
KEYED_EDU_TYPE = "k"
EDU_TYPE = "e"
Expand All @@ -49,8 +55,6 @@ def __init__(self, hs):
self.server_name = hs.hostname
self.clock = hs.get_clock()

# TODO: Add metrics for size of lists below

self.presence_map = {}
self.presence_changed = sorteddict()

Expand All @@ -61,10 +65,24 @@ def __init__(self, hs):

self.failures = sorteddict()

self.device_messages = sorteddict()

self.pos = 1
self.pos_time = sorteddict()

self.device_messages = sorteddict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
def register(name, queue):
metrics.register_callback(
queue_name + "_size",
lambda: len(queue),
)

for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
"edus", "failures", "device_messages", "pos_time",
]:
register(queue_name, getattr(self, queue_name))

self.clock.looping_call(self._clear_queue, 30 * 1000)

Expand All @@ -76,7 +94,6 @@ def _next_pos(self):

def _clear_queue(self):
"""Clear the queues for anything older than N minutes"""
# TODO measure this function time.

FIVE_MINUTES_AGO = 5 * 60 * 1000
now = self.clock.time_msec()
Expand All @@ -94,51 +111,54 @@ def _clear_queue(self):

def _clear_queue_before_pos(self, position_to_delete):
"""Clear all the queues from before a given position"""
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]

user_ids = set(
user_id for uids in self.presence_changed.values() for _, user_id in uids
)

to_del = [user_id for user_id in self.presence_map if user_id not in user_ids]
for user_id in to_del:
del self.presence_map[user_id]

# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]

live_keys = set()
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)

to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
for edu_key in to_del:
del self.keyed_edu[edu_key]

# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]

# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]

user_ids = set(
user_id for uids in self.presence_changed.values() for _, user_id in uids
)

# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]
to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
]
for user_id in to_del:
del self.presence_map[user_id]

# Delete things out of keyed edus
keys = self.keyed_edu_changed.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.keyed_edu_changed[key]

live_keys = set()
for edu_key in self.keyed_edu_changed.values():
live_keys.add(edu_key)

to_del = [edu_key for edu_key in self.keyed_edu if edu_key not in live_keys]
for edu_key in to_del:
del self.keyed_edu[edu_key]

# Delete things out of edu map
keys = self.edus.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.edus[key]

# Delete things out of failure map
keys = self.failures.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.failures[key]

# Delete things out of device map
keys = self.device_messages.keys()
i = keys.bisect_left(position_to_delete)
for key in keys[:i]:
del self.device_messages[key]

def notify_new_events(self, current_id):
"""As per TransactionQueue"""
Expand Down

0 comments on commit 88d85eb

Please sign in to comment.