Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Expose statistics on extrems to prometheus #5384

Merged
merged 16 commits into from
Jun 13, 2019
Merged
1 change: 1 addition & 0 deletions changelog.d/5384.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Statistics on forward extremities per room are now exposed via Prometheus.
2 changes: 1 addition & 1 deletion scripts/generate_signing_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import argparse
import sys

from signedjson.key import write_signing_keys, generate_signing_key
from signedjson.key import generate_signing_key, write_signing_keys

from synapse.util.stringutils import random_string

Expand Down
95 changes: 75 additions & 20 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily

from twisted.internet import reactor

Expand All @@ -40,7 +40,6 @@


class RegistryProxy(object):

@staticmethod
def collect():
for metric in REGISTRY.collect():
Expand All @@ -63,10 +62,7 @@ def collect(self):
try:
calls = self.caller()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)",
self.name,
)
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return

Expand Down Expand Up @@ -116,9 +112,7 @@ def __init__(self, name, desc, labels, sub_metrics):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
"_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)

# Counts number of in flight blocks for a given set of label values
Expand Down Expand Up @@ -157,7 +151,9 @@ def collect(self):

Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
in_flight = GaugeMetricFamily(
self.name + "_total", self.desc, labels=self.labels
)

metrics_by_key = {}

Expand All @@ -179,7 +175,9 @@ def collect(self):
yield in_flight

for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
gauge = GaugeMetricFamily(
"_".join([self.name, name]), "", labels=self.labels
)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
Expand All @@ -193,12 +191,58 @@ def _register_with_collector(self):
all_gauges[self.name] = self


@attr.s(hash=True)
class BucketCollector(object):

hawkowl marked this conversation as resolved.
Show resolved Hide resolved
name = attr.ib()
data_collector = attr.ib()
buckets = attr.ib()

def collect(self):

# Fetch the data -- this must be synchronous!
data = self.data_collector()

buckets = {}

res = []
for x in data.keys():
for i, bound in enumerate(self.buckets[:-1]):
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
break

for i in self.buckets[:-1]:
res.append([i, buckets.get(i, 0)])

res.append(["+Inf", sum(data.values())])

metric = HistogramMetricFamily(
self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()])
)
yield metric

def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")

self.buckets = tuple(self.buckets)

if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


#
# Detailed CPU metrics
#

class CPUMetrics(object):

class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
Expand Down Expand Up @@ -237,13 +281,28 @@ def collect(self):
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
buckets=[
0.0025,
0.005,
0.01,
0.025,
0.05,
0.10,
0.25,
0.50,
1.00,
2.50,
5.00,
7.50,
15.00,
30.00,
45.00,
60.00,
],
)


class GCCounts(object):

def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
Expand Down Expand Up @@ -279,9 +338,7 @@ def collect(self):
events_processed_counter = Counter("synapse_federation_client_events_processed", "")

event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count",
"Event processing loop iterations",
["name"],
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)

event_processing_loop_room_count = Counter(
Expand Down Expand Up @@ -311,7 +368,6 @@ def collect(self):


class ReactorLastSeenMetric(object):

def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
Expand All @@ -325,7 +381,6 @@ def collect(self):


def runUntilCurrentTimer(func):

@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
Expand Down
44 changes: 31 additions & 13 deletions synapse/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import itertools
import logging
from collections import OrderedDict, deque, namedtuple
from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps

from six import iteritems, text_type
Expand All @@ -33,6 +33,7 @@
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
Expand Down Expand Up @@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):

def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)

self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()

# Collect metrics on the number of forward extremities that exist.
self._current_forward_extremities_amount = {}

BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
)

# Read the extrems every 60 minutes
hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)

@defer.inlineCallbacks
def _read_forward_extremities(self):
def fetch(txn):
txn.execute(
"""
select count(*) c from event_forward_extremities
group by room_id
"""
)
hawkowl marked this conversation as resolved.
Show resolved Hide resolved
return txn.fetchall()

res = yield self.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))

@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
Expand Down Expand Up @@ -568,17 +594,11 @@ def _get_events_which_are_prevs_txn(txn, batch):
)

txn.execute(sql, batch)
results.extend(
r[0]
for r in txn
if not json.loads(r[1]).get("soft_failed")
)
results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events_which_are_prevs_txn,
chunk,
"_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)

defer.returnValue(results)
Expand Down Expand Up @@ -640,9 +660,7 @@ def _get_prevs_before_rejected_txn(txn, batch):

for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_prevs_before_rejected",
_get_prevs_before_rejected_txn,
chunk,
"_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)

defer.returnValue(existing_prevs)
Expand Down
Loading