Skip to content

Commit ae1c777

Browse files
committed
Added support for central config management (#511)
The agent will periodically poll the APM Server for configuration changes, and apply them if a new configuration is detected. Currently, only `transaction_sample_rate` is supported closes #511
1 parent 22d4f3f commit ae1c777

15 files changed

+377
-43
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
## Other changes
1111

1212
* Added support for recording breakdown metrics (#535)
13+
* Added support for central config management (#511)
1314
* Added instrumentation for `urllib2` (Python 2) / `urllib.request` (Python 3) (#464)
1415
* Added `disable_metrics` setting (#399)
1516
* Updated elasticsearch instrumentation for 7.x (#482, #483)

docs/configuration.asciidoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,22 @@ NOTE: this setting only disables the *sending* of the given metrics, not collect
649649
Enable/disable the tracking and collection of breakdown metrics.
650650
By setting this to `False`, tracking this metric is completely disabled, which can reduce the overhead of the agent.
651651

652+
NOTE: This feature requires APM Server and Kibana >= 7.3.
653+
654+
[float]
655+
[[config-central_config]]
656+
==== `central_config`
657+
|============
658+
| Environment | Django/Flask | Default
659+
| `ELASTIC_APM_CENTRAL_CONFIG` | `CENTRAL_CONFIG` | `True`
660+
|============
661+
662+
When enabled, the agent will make periodic requests to the APM Server to fetch updated configuration.
663+
664+
See {kibana-ref}/agent-configuration.html#agent-configuration[APM Agent Configuration] for more documentation on central agent configuration.
665+
666+
NOTE: This feature requires APM Server and Kibana >= 7.3.
667+
652668
[float]
653669
[[config-django-specific]]
654670
=== Django-specific configuration

elasticapm/base.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@
4242
from copy import deepcopy
4343

4444
import elasticapm
45-
from elasticapm.conf import Config, constants
45+
from elasticapm.conf import Config, VersionedConfig, constants, update_config
4646
from elasticapm.conf.constants import ERROR
4747
from elasticapm.metrics.base_metrics import MetricsRegistry
4848
from elasticapm.traces import Tracer, execution_context
4949
from elasticapm.utils import cgroup, compat, is_master_process, stacks, varmap
5050
from elasticapm.utils.encoding import keyword_field, shorten, transform
5151
from elasticapm.utils.module_import import import_string
52+
from elasticapm.utils.threading import IntervalTimer
5253

5354
__all__ = ("Client",)
5455

@@ -95,11 +96,12 @@ def __init__(self, config=None, **inline):
9596
self.filter_exception_types_dict = {}
9697
self._service_info = None
9798

98-
self.config = Config(config, inline_dict=inline)
99-
if self.config.errors:
100-
for msg in self.config.errors.values():
99+
config = Config(config, inline_dict=inline)
100+
if config.errors:
101+
for msg in config.errors.values():
101102
self.error_logger.error(msg)
102-
self.config.disable_send = True
103+
config.disable_send = True
104+
self.config = VersionedConfig(config, version=None)
103105

104106
headers = {
105107
"Content-Type": "application/x-ndjson",
@@ -157,10 +159,7 @@ def __init__(self, config=None, **inline):
157159
),
158160
),
159161
queue_func=self.queue,
160-
sample_rate=self.config.transaction_sample_rate,
161-
max_spans=self.config.transaction_max_spans,
162-
span_frames_min_duration=self.config.span_frames_min_duration,
163-
ignore_patterns=self.config.transactions_ignore_patterns,
162+
config=self.config,
164163
agent=self,
165164
)
166165
self.include_paths_re = stacks.get_path_regex(self.config.include_paths) if self.config.include_paths else None
@@ -173,6 +172,13 @@ def __init__(self, config=None, **inline):
173172
if self.config.breakdown_metrics:
174173
self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet")
175174
compat.atexit_register(self.close)
175+
if self.config.central_config:
176+
self._config_updater = IntervalTimer(
177+
update_config, 1, "eapm conf updater", daemon=True, args=(self,), evaluate_function_interval=True
178+
)
179+
self._config_updater.start()
180+
else:
181+
self._config_updater = None
176182

177183
def get_handler(self, name):
178184
return import_string(name)
@@ -241,6 +247,8 @@ def end_transaction(self, name=None, result=""):
241247
def close(self):
242248
if self._metrics:
243249
self._metrics._stop_collect_timer()
250+
if self._config_updater:
251+
self._config_updater.cancel()
244252
self._transport.close()
245253

246254
def get_service_info(self):

elasticapm/conf/__init__.py

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import os
3434
import re
3535
import socket
36+
import threading
3637

3738
from elasticapm.utils import compat, starmatch_to_regex
3839

3940
__all__ = ("setup_logging", "Config")
4041

42+
logger = logging.getLogger("elasticapm.conf")
43+
4144

4245
class ConfigurationError(ValueError):
4346
def __init__(self, msg, field_name):
@@ -64,8 +67,6 @@ def __get__(self, instance, owner):
6467

6568
def __set__(self, instance, value):
6669
value = self._validate(instance, value)
67-
if value is not None:
68-
value = self.type(value)
6970
instance._values[self.dict_key] = value
7071

7172
def _validate(self, instance, value):
@@ -76,6 +77,11 @@ def _validate(self, instance, value):
7677
if self.validators and value is not None:
7778
for validator in self.validators:
7879
value = validator(value, self.dict_key)
80+
if self.type and value is not None:
81+
try:
82+
value = self.type(value)
83+
except ValueError as e:
84+
raise ConfigurationError("{}: {}".format(self.dict_key, compat.text_type(e)), self.dict_key)
7985
instance._errors.pop(self.dict_key, None)
8086
return value
8187

@@ -183,6 +189,9 @@ class _ConfigBase(object):
183189
def __init__(self, config_dict=None, env_dict=None, inline_dict=None):
184190
self._values = {}
185191
self._errors = {}
192+
self.update(config_dict, env_dict, inline_dict)
193+
194+
def update(self, config_dict=None, env_dict=None, inline_dict=None):
186195
if config_dict is None:
187196
config_dict = {}
188197
if env_dict is None:
@@ -209,6 +218,14 @@ def __init__(self, config_dict=None, env_dict=None, inline_dict=None):
209218
except ConfigurationError as e:
210219
self._errors[e.field_name] = str(e)
211220

221+
@property
222+
def values(self):
223+
return self._values
224+
225+
@values.setter
226+
def values(self, values):
227+
self._values = values
228+
212229
@property
213230
def errors(self):
214231
return self._errors
@@ -263,6 +280,7 @@ class Config(_ConfigBase):
263280
)
264281
breakdown_metrics = _BoolConfigValue("BREAKDOWN_METRICS", default=True)
265282
disable_metrics = _ListConfigValue("DISABLE_METRICS", type=starmatch_to_regex, default=[])
283+
central_config = _BoolConfigValue("CENTRAL_CONFIG", default=True)
266284
api_request_size = _ConfigValue("API_REQUEST_SIZE", type=int, validators=[size_validator], default=750 * 1024)
267285
api_request_time = _ConfigValue("API_REQUEST_TIME", type=int, validators=[duration_validator], default=10 * 1000)
268286
transaction_sample_rate = _ConfigValue("TRANSACTION_SAMPLE_RATE", type=float, default=1.0)
@@ -296,6 +314,95 @@ class Config(_ConfigBase):
296314
django_transaction_name_from_route = _BoolConfigValue("DJANGO_TRANSACTION_NAME_FROM_ROUTE", default=False)
297315

298316

317+
class VersionedConfig(object):
318+
"""
319+
A thin layer around Config that provides versioning
320+
"""
321+
322+
__slots__ = ("_config", "_version", "_first_config", "_first_version", "_lock")
323+
324+
def __init__(self, config_object, version):
325+
"""
326+
Create a new VersionedConfig with an initial Config object
327+
:param config_object: the initial Config object
328+
:param version: a version identifier for the configuration
329+
"""
330+
self._config = self._first_config = config_object
331+
self._version = self._first_version = version
332+
self._lock = threading.Lock()
333+
334+
def update(self, version, **config):
335+
"""
336+
Update the configuration version
337+
:param version: version identifier for the new configuration
338+
:param config: a key/value map of new configuration
339+
:return: configuration errors, if any
340+
"""
341+
new_config = Config()
342+
new_config.values = self._config.values.copy()
343+
344+
# pass an empty env dict to ensure the environment doesn't get precedence
345+
new_config.update(inline_dict=config, env_dict={})
346+
if not new_config.errors:
347+
with self._lock:
348+
self._version = version
349+
self._config = new_config
350+
else:
351+
return new_config.errors
352+
353+
def reset(self):
354+
"""
355+
Reset state to the original configuration
356+
"""
357+
with self._lock:
358+
self._version = self._first_version
359+
self._config = self._first_config
360+
361+
@property
362+
def changed(self):
363+
return self._config != self._first_config
364+
365+
def __getattr__(self, item):
366+
return getattr(self._config, item)
367+
368+
def __setattr__(self, name, value):
369+
if name not in self.__slots__:
370+
setattr(self._config, name, value)
371+
else:
372+
super(VersionedConfig, self).__setattr__(name, value)
373+
374+
@property
375+
def config_version(self):
376+
return self._version
377+
378+
379+
def update_config(agent):
380+
logger.debug("Checking for new config...")
381+
transport = agent._transport
382+
keys = {"service": {"name": agent.config.service_name}}
383+
if agent.config.environment:
384+
keys["service"]["environment"] = agent.config.environment
385+
new_version, new_config, next_run = transport.get_config(agent.config.config_version, keys)
386+
if new_version and new_config:
387+
errors = agent.config.update(new_version, **new_config)
388+
if errors:
389+
logger.error("Error applying new configuration: %s", repr(errors))
390+
else:
391+
logger.info(
392+
"Applied new configuration: %s",
393+
"; ".join(
394+
"%s=%s" % (compat.text_type(k), compat.text_type(v)) for k, v in compat.iteritems(new_config)
395+
),
396+
)
397+
elif new_version == agent.config.config_version:
398+
logger.debug("Remote config unchanged")
399+
elif not new_config and agent.config.changed:
400+
logger.debug("Remote config disappeared, resetting to original")
401+
agent.config.reset()
402+
403+
return next_run
404+
405+
299406
def setup_logging(handler, exclude=("gunicorn", "south", "elasticapm.errors")):
300407
"""
301408
Configures logging to pipe to Elastic APM.

elasticapm/conf/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import decimal
3232

3333
EVENTS_API_PATH = "intake/v2/events"
34+
AGENT_CONFIG_PATH = "config/v1/agents"
3435

3536
TRACE_CONTEXT_VERSION = 0
3637
TRACEPARENT_HEADER_NAME = "elastic-apm-traceparent"

elasticapm/traces.py

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def _begin_span(
212212
tracer = self.tracer
213213
if parent_span and parent_span.leaf:
214214
span = DroppedSpan(parent_span, leaf=True)
215-
elif tracer.max_spans and self._span_counter > tracer.max_spans - 1:
215+
elif tracer.config.transaction_max_spans and self._span_counter > tracer.config.transaction_max_spans - 1:
216216
self.dropped_spans += 1
217217
span = DroppedSpan(parent_span)
218218
self._span_counter += 1
@@ -450,29 +450,18 @@ def child_ended(self, timestamp):
450450

451451

452452
class Tracer(object):
453-
def __init__(
454-
self,
455-
frames_collector_func,
456-
frames_processing_func,
457-
queue_func,
458-
sample_rate=1.0,
459-
max_spans=0,
460-
span_frames_min_duration=None,
461-
ignore_patterns=None,
462-
agent=None,
463-
):
464-
self.max_spans = max_spans
453+
def __init__(self, frames_collector_func, frames_processing_func, queue_func, config, agent):
454+
self.config = config
465455
self.queue_func = queue_func
466456
self.frames_processing_func = frames_processing_func
467457
self.frames_collector_func = frames_collector_func
468-
self._ignore_patterns = [re.compile(p) for p in ignore_patterns or []]
469-
self._sample_rate = sample_rate
470458
self._agent = agent
471-
if span_frames_min_duration in (-1, None):
459+
self._ignore_patterns = [re.compile(p) for p in config.transactions_ignore_patterns or []]
460+
if config.span_frames_min_duration in (-1, None):
472461
# both None and -1 mean "no minimum"
473462
self.span_frames_min_duration = None
474463
else:
475-
self.span_frames_min_duration = span_frames_min_duration / 1000.0
464+
self.span_frames_min_duration = config.span_frames_min_duration / 1000.0
476465

477466
def begin_transaction(self, transaction_type, trace_parent=None):
478467
"""
@@ -483,7 +472,9 @@ def begin_transaction(self, transaction_type, trace_parent=None):
483472
if trace_parent:
484473
is_sampled = bool(trace_parent.trace_options.recorded)
485474
else:
486-
is_sampled = self._sample_rate == 1.0 or self._sample_rate > random.random()
475+
is_sampled = (
476+
self.config.transaction_sample_rate == 1.0 or self.config.transaction_sample_rate > random.random()
477+
)
487478
transaction = Transaction(self, transaction_type, trace_parent=trace_parent, is_sampled=is_sampled)
488479
if trace_parent is None:
489480
transaction.trace_parent = TraceParent(

elasticapm/transport/http.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import hashlib
3434
import logging
3535
import os
36+
import re
3637
import ssl
3738

3839
import certifi
@@ -41,7 +42,7 @@
4142

4243
from elasticapm.transport.base import TransportException
4344
from elasticapm.transport.http_base import AsyncHTTPTransportBase, HTTPTransportBase
44-
from elasticapm.utils import compat, read_pem_file
45+
from elasticapm.utils import compat, json_encoder, read_pem_file
4546

4647
logger = logging.getLogger("elasticapm.transport.http")
4748

@@ -103,6 +104,35 @@ def send(self, data):
103104
if response:
104105
response.close()
105106

107+
def get_config(self, current_version=None, keys=None):
108+
url = self._config_url
109+
data = json_encoder.dumps(keys).encode("utf-8")
110+
headers = self._headers.copy()
111+
max_age = 300
112+
if current_version:
113+
headers["If-None-Match"] = current_version
114+
try:
115+
response = self.http.urlopen(
116+
"POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False
117+
)
118+
except (urllib3.exceptions.RequestError, urllib3.exceptions.HTTPError) as e:
119+
logger.debug("HTTP error while fetching remote config: %s", compat.text_type(e))
120+
return current_version, None, max_age
121+
body = response.read()
122+
if "Cache-Control" in response.headers:
123+
try:
124+
max_age = int(next(re.finditer(r"max-age=(\d+)", response.headers["Cache-Control"])).groups()[0])
125+
except StopIteration:
126+
logger.debug("Could not parse Cache-Control header: %s", response["Cache-Control"])
127+
if response.status == 304:
128+
# config is unchanged, return
129+
logger.debug("Configuration unchanged")
130+
return current_version, None, max_age
131+
elif response.status >= 400:
132+
return None, None, max_age
133+
134+
return response.headers.get("Etag"), json_encoder.loads(body.decode("utf-8")), max_age
135+
106136
@property
107137
def cert_fingerprint(self):
108138
if self._server_cert:

0 commit comments

Comments
 (0)