Skip to content

Commit

Permalink
Add client side aggregation (#844)
Browse files Browse the repository at this point in the history
* add aggregator  (#841)

* add aggregator WIP

* add metric types enum

* add threading to metrics.py, add unsafe_flush

* fix aggregator

* add tests

* remove print statements

* remove lock for flush unsafe function

* fix removing wrong lock

* remove unecessary If

* Run tests on any branch

* Change MetricAggregator propreties so that it has all necessary fields to send to client

* change aggregator to use MetricAggregator type

* fix test

* fix lint

* remove enums

* fix lint x2

* fix lint x3

* use existing threading function

* remove import

* fix lint

* refactor _stop_flush_thread

* rename variable

* remove client from aggregator

* move changes to base.py to next PR

---------

Co-authored-by: Brian Floersch <brian.floersch@datadoghq.com>

* add flush_function parameter

* refactor base.py to use the aggregator

* revert test.yml change

* WIP

* disable buffering when aggregation is enabled

* add flush_aggregated_metrics function

* check whether or not _start_flush_thread is being called for buffering or aggregation before returning

* WIP

* finish disabling aggregation functions

* move line inside if statement

* fix lint

* fix lint 2

* fix lint

* fix renamed variable

* fix unit tests, rename variables

* check if thread is None

* fix tests

* fix tests

* rerun tests

* add back unicode

* revert lint to forking

* reverse lint changes

* add aggregation to increment

* TESTING

* revert name back to flush_interval

* test

* change setter name

* fix lint

* change order of disable aggregation and buffering

* test

* change order of disable aggregation and buffering

* remove setter design

* set default aggregation time interval

* fix lint

* remove function call

* fix lint

* do not allow buffering when aggregation is enabled

* testing for benchmarks

* check if removing logs fixes test

* add back logs

* remove logs

* rerun tests

* set default aggregation flush interval to 2 seconds

* remove print statement

* address comments

* use a single flush thread

* remove unecessary lint changes, fix refactored code

* add back import

* fix refactored code

* remove lint and comment

* check lint error

* fix variable naming, deleted test function

* add back imports for comment variables

* fix lint

* test add disable_aggregating to test_fork_hooks

* add parameter

* revert changes

* test

* check for tags = None

* remove test

* fix comment

* rename variable

* add flush function

---------

Co-authored-by: Brian Floersch <brian.floersch@datadoghq.com>
  • Loading branch information
andrewqian2001datadog and gh123man authored Aug 12, 2024
1 parent cabf231 commit 74ffb61
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 110 deletions.
15 changes: 14 additions & 1 deletion datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def initialize(
api_host=None, # type: Optional[str]
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_aggregator=True, # type: bool
statsd_disable_buffering=True, # type: bool
statsd_aggregation_flush_interval=2, # type: float
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -76,6 +78,14 @@ def initialize(
(default: True).
:type statsd_disable_buffering: boolean
:param statsd_disable_aggregator: Enable/disable statsd client aggregation support
(default: True).
:type statsd_disable_aggregator: boolean
:param statsd_aggregation_flush_interval: Sets the flush interval for aggregation
(default: 2 seconds)
:type statsd_aggregation_flush_interval: float
:param statsd_use_default_route: Dynamically set the statsd host to the default route
(Useful when running the client in a container)
:type statsd_use_default_route: boolean
Expand Down Expand Up @@ -128,8 +138,11 @@ def initialize(
if statsd_constant_tags:
statsd.constant_tags += statsd_constant_tags

if statsd_disable_aggregator:
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
statsd.disable_buffering = statsd_disable_buffering

api._return_raw_response = return_raw_response

# HTTP client and API options
Expand Down
62 changes: 62 additions & 0 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import threading
from datadog.dogstatsd.metrics import (
CountMetric,
GaugeMetric,
SetMetric,
)
from datadog.dogstatsd.metric_types import MetricType


class Aggregator(object):
def __init__(self):
self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
MetricType.SET: {},
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
MetricType.SET: threading.RLock(),
}

def flush_aggregated_metrics(self):
metrics = []
for metric_type in self.metrics_map.keys():
with self._locks[metric_type]:
current_metrics = self.metrics_map[metric_type]
self.metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])
return metrics

def get_context(self, name, tags):
tags_str = ",".join(tags) if tags is not None else ""
return "{}:{}".format(name, tags_str)

def count(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.COUNT, CountMetric, name, value, tags, rate, timestamp
)

def gauge(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.GAUGE, GaugeMetric, name, value, tags, rate, timestamp
)

def set(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.SET, SetMetric, name, value, tags, rate, timestamp
)

def add_metric(
self, metric_type, metric_class, name, value, tags, rate, timestamp=0
):
context = self.get_context(name, tags)
with self._locks[metric_type]:
if context in self.metrics_map[metric_type]:
self.metrics_map[metric_type][context].aggregate(value)
else:
self.metrics_map[metric_type][context] = metric_class(
name, value, tags, rate, timestamp
)
Loading

0 comments on commit 74ffb61

Please sign in to comment.