Skip to content

Commit 1248ca9

Browse files
committed
first stab at implementing metrics [WIP]
1 parent 929fcc5 commit 1248ca9

File tree

6 files changed

+183
-4
lines changed

6 files changed

+183
-4
lines changed

elasticapm/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import elasticapm
2525
from elasticapm.conf import Config, constants
2626
from elasticapm.conf.constants import ERROR
27+
from elasticapm.metrics.base_metrics import MetricsRegistry
2728
from elasticapm.traces import Tracer, get_transaction
2829
from elasticapm.utils import compat, is_master_process, stacks, varmap
2930
from elasticapm.utils.encoding import keyword_field, shorten, transform
@@ -140,6 +141,9 @@ def __init__(self, config=None, **inline):
140141
)
141142
self.include_paths_re = stacks.get_path_regex(self.config.include_paths) if self.config.include_paths else None
142143
self.exclude_paths_re = stacks.get_path_regex(self.config.exclude_paths) if self.config.exclude_paths else None
144+
self._metrics = MetricsRegistry(self.config.metrics_interval / 1000.0, self.queue)
145+
for path in self.config.metrics_sets:
146+
self._metrics.register(path)
143147
compat.atexit_register(self.close)
144148

145149
def get_handler(self, name):

elasticapm/conf/__init__.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,29 @@ def __call__(self, value, field_name):
122122
return val
123123

124124

125-
duration_validator = UnitValidator("^((?:-)?\d+)(ms|s|m)$", "\d+(ms|s|m)", {"ms": 1, "s": 1000, "m": 60000})
125+
duration_validator = UnitValidator(r"^((?:-)?\d+)(ms|s|m)$", r"\d+(ms|s|m)", {"ms": 1, "s": 1000, "m": 60000})
126126
size_validator = UnitValidator(
127-
"^(\d+)(b|kb|mb|gb)$", "\d+(b|KB|MB|GB)", {"b": 1, "kb": 1024, "mb": 1024 * 1024, "gb": 1024 * 1024 * 1024}
127+
r"^(\d+)(b|kb|mb|gb)$", r"\d+(b|KB|MB|GB)", {"b": 1, "kb": 1024, "mb": 1024 * 1024, "gb": 1024 * 1024 * 1024}
128128
)
129129

130130

131+
class ExcludeRangeValidator(object):
132+
def __init__(self, range_start, range_end, range_desc):
133+
self.range_start = range_start
134+
self.range_end = range_end
135+
self.range_desc = range_desc
136+
137+
def __call__(self, value, field_name):
138+
if self.range_start <= value <= self.range_end:
139+
raise ConfigurationError(
140+
"{} cannot be in range: {}".format(
141+
value, self.range_desc.format(**{"range_start": self.range_start, "range_end": self.range_end})
142+
),
143+
field_name,
144+
)
145+
return value
146+
147+
131148
class _ConfigBase(object):
132149
_NO_VALUE = object() # sentinel object
133150

@@ -178,7 +195,9 @@ class Config(_ConfigBase):
178195
server_timeout = _ConfigValue(
179196
"SERVER_TIMEOUT",
180197
type=float,
181-
validators=[UnitValidator("^((?:-)?\d+)(ms|s|m)?$", "\d+(ms|s|m)", {"ms": 0.001, "s": 1, "m": 60, None: 1000})],
198+
validators=[
199+
UnitValidator(r"^((?:-)?\d+)(ms|s|m)?$", r"\d+(ms|s|m)", {"ms": 0.001, "s": 1, "m": 60, None: 1000})
200+
],
182201
default=5,
183202
)
184203
hostname = _ConfigValue("HOSTNAME", default=socket.gethostname())
@@ -196,14 +215,23 @@ class Config(_ConfigBase):
196215
"elasticapm.processors.sanitize_http_request_body",
197216
],
198217
)
218+
metrics_sets = _ListConfigValue("METRICS_SETS", default=["elasticapm.metrics.cpu_psutil.CPUMetricSet"])
219+
metrics_interval = _ConfigValue(
220+
"METRICS_INTERVAL",
221+
type=int,
222+
validators=[duration_validator, ExcludeRangeValidator(1, 999, "{range_start} - {range_end} ms")],
223+
default=30000,
224+
)
199225
api_request_size = _ConfigValue("API_REQUEST_SIZE", type=int, validators=[size_validator], default=750 * 1024)
200226
api_request_time = _ConfigValue("API_REQUEST_TIME", type=int, validators=[duration_validator], default=10 * 1000)
201227
transaction_sample_rate = _ConfigValue("TRANSACTION_SAMPLE_RATE", type=float, default=1.0)
202228
transaction_max_spans = _ConfigValue("TRANSACTION_MAX_SPANS", type=int, default=500)
203229
span_frames_min_duration = _ConfigValue(
204230
"SPAN_FRAMES_MIN_DURATION",
205231
default=5,
206-
validators=[UnitValidator("^((?:-)?\d+)(ms|s|m)?$", "\d+(ms|s|m)", {"ms": 1, "s": 1000, "m": 60000, None: 1})],
232+
validators=[
233+
UnitValidator(r"^((?:-)?\d+)(ms|s|m)?$", r"\d+(ms|s|m)", {"ms": 1, "s": 1000, "m": 60000, None: 1})
234+
],
207235
type=int,
208236
)
209237
collect_local_variables = _ConfigValue("COLLECT_LOCAL_VARIABLES", default="errors")

elasticapm/metrics/__init__.py

Whitespace-only changes.

elasticapm/metrics/base_metrics.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import logging
2+
import threading
3+
import time
4+
5+
from elasticapm.utils import compat
6+
from elasticapm.utils.module_import import import_string
7+
8+
logger = logging.getLogger("elasticapm.metrics")
9+
10+
11+
class MetricsRegistry(object):
12+
def __init__(self, collect_interval, queue_func, tags=None):
13+
self._collect_interval = collect_interval
14+
self._queue_func = queue_func
15+
self._metricsets = {}
16+
self._tags = tags or {}
17+
self._collect_timer = None
18+
if self._collect_interval:
19+
self._start_collect_timer()
20+
21+
def register(self, class_path):
22+
if class_path in self._metricsets:
23+
return
24+
else:
25+
try:
26+
class_obj = import_string(class_path)
27+
self._metricsets[class_path] = class_obj()
28+
except ImportError as e:
29+
logger.warning("Could not register %s metricset: %s", class_path, compat.text_type(e))
30+
31+
def collect(self, start_timer=True):
32+
logger.info("Starting metrics collect timer")
33+
34+
for name, metricset in compat.iteritems(self._metricsets):
35+
data = metricset.collect()
36+
if data:
37+
self._queue_func("metricset", data)
38+
39+
if start_timer:
40+
self._start_collect_timer()
41+
42+
def _start_collect_timer(self, timeout=None):
43+
timeout = timeout or self._collect_interval
44+
self._collect_timer = threading.Timer(timeout, self.collect)
45+
self._collect_timer.name = "elasticapm metrics collect timer"
46+
self._collect_timer.daemon = True
47+
logger.info("Starting metrics collect timer")
48+
self._collect_timer.start()
49+
50+
def _stop_collect_timer(self):
51+
if self._collect_timer:
52+
logger.info("Cancelling collect timer")
53+
self._collect_timer.cancel()
54+
55+
56+
class MetricsSet(object):
57+
def __init__(self):
58+
self._lock = threading.Lock()
59+
self._counters = {}
60+
self._gauges = {}
61+
62+
def counter(self, name):
63+
with self._lock:
64+
if name not in self._counters:
65+
self._counters[name] = Counter(name)
66+
return self._counters[name]
67+
68+
def gauge(self, name):
69+
with self._lock:
70+
if name not in self._gauges:
71+
self._gauges[name] = Gauge(name)
72+
return self._gauges[name]
73+
74+
def collect(self):
75+
samples = {}
76+
if self._counters:
77+
samples.update({label: {"value": c.val} for label, c in compat.iteritems(self._counters)})
78+
if self._gauges:
79+
samples.update({label: {"value": g.val} for label, g in compat.iteritems(self._gauges)})
80+
if samples:
81+
return {"samples": samples, "timestamp": int(time.time() * 1000000)}
82+
83+
84+
class Counter(object):
85+
def __init__(self, label):
86+
self.label = label
87+
self._lock = threading.Lock()
88+
self._val = 0
89+
90+
def inc(self, delta=1):
91+
with self._lock:
92+
self._val += delta
93+
94+
def dec(self, delta=1):
95+
with self._lock:
96+
self._val += delta
97+
98+
def reset(self):
99+
with self._lock:
100+
self._val = 0
101+
102+
@property
103+
def val(self):
104+
return self._val
105+
106+
107+
class Gauge(object):
108+
def __init__(self, label):
109+
self.label = label
110+
self._val = None
111+
112+
@property
113+
def val(self):
114+
return self._val
115+
116+
@val.setter
117+
def val(self, value):
118+
self._val = value

elasticapm/metrics/cpu_psutil.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from elasticapm.metrics.base_metrics import MetricsSet
2+
3+
try:
4+
import psutil
5+
except ImportError:
6+
raise ImportError("psutil not found. Install it to get system and process metrics")
7+
8+
9+
class CPUMetricSet(MetricsSet):
10+
def __init__(self):
11+
psutil.cpu_percent(interval=None)
12+
super(CPUMetricSet, self).__init__()
13+
14+
def collect(self):
15+
cpu_count = psutil.cpu_count()
16+
self.gauge("system.cpu.total.norm.pct").val = psutil.cpu_percent(interval=None) / cpu_count
17+
self.gauge("system.memory.actual.free").val = psutil.virtual_memory().available
18+
self.gauge("system.memory.total").val = psutil.virtual_memory().total
19+
20+
p = psutil.Process()
21+
with p.oneshot():
22+
memory_info = p.memory_full_info()
23+
self.gauge("system.process.cpu.total.norm.pct").val = p.cpu_percent() / cpu_count
24+
self.gauge("system.process.memory.size").val = memory_info.vms
25+
self.gauge("system.process.memory.rss.bytes").val = memory_info.rss
26+
return super(CPUMetricSet, self).collect()

elasticapm/transport/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import threading
55
import timeit
6+
from collections import defaultdict
67

78
from elasticapm.contrib.async_worker import AsyncWorker
89
from elasticapm.utils import json_encoder
@@ -56,11 +57,13 @@ def __init__(
5657
self._queue_lock = threading.Lock()
5758
self._last_flush = timeit.default_timer()
5859
self._flush_timer = None
60+
self._counts = defaultdict(int)
5961

6062
def queue(self, event_type, data, flush=False):
6163
with self._queue_lock:
6264
queued_data = self.queued_data
6365
queued_data.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8"))
66+
self._counts[event_type] += 1
6467
since_last_flush = timeit.default_timer() - self._last_flush
6568
queue_size = 0 if queued_data.fileobj is None else queued_data.fileobj.tell()
6669
if flush:

0 commit comments

Comments
 (0)