Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client side aggregation #844

Merged
merged 68 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 65 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
b73d167
add aggregator (#841)
andrewqian2001datadog Jul 16, 2024
480f76f
add flush_function parameter
andrewqian2001datadog Jul 16, 2024
8946cde
refactor base.py to use the aggregator
andrewqian2001datadog Jul 17, 2024
e09faae
revert test.yml change
andrewqian2001datadog Jul 17, 2024
4786aaa
WIP
andrewqian2001datadog Jul 19, 2024
78bb580
disable buffering when aggregation is enabled
andrewqian2001datadog Jul 19, 2024
b7b498a
add flush_aggregated_metrics function
andrewqian2001datadog Jul 19, 2024
3da584c
check whether or not _start_flush_thread is being called for bufferin…
andrewqian2001datadog Jul 22, 2024
eedbec5
WIP
andrewqian2001datadog Jul 22, 2024
78ce146
finish disabling aggregation functions
andrewqian2001datadog Jul 22, 2024
9323a0b
move line inside if statement
andrewqian2001datadog Jul 23, 2024
572c1f3
fix lint
andrewqian2001datadog Jul 23, 2024
cdca124
fix lint 2
andrewqian2001datadog Jul 23, 2024
ed8dde6
fix lint
andrewqian2001datadog Jul 23, 2024
cff8199
fix renamed variable
andrewqian2001datadog Jul 23, 2024
6ee3c81
fix unit tests, rename variables
andrewqian2001datadog Jul 23, 2024
2305ab5
check if thread is None
andrewqian2001datadog Jul 23, 2024
8624749
fix tests
andrewqian2001datadog Jul 23, 2024
7f05577
fix tests
andrewqian2001datadog Jul 23, 2024
d07347b
rerun tests
andrewqian2001datadog Jul 26, 2024
0a3dc9a
Merge branch 'master' into add-client-side-aggregation
andrewqian2001datadog Jul 26, 2024
24b2eef
add back unicode
andrewqian2001datadog Jul 26, 2024
d6f8afa
revert lint to forking
andrewqian2001datadog Jul 26, 2024
974e496
reverse lint changes
andrewqian2001datadog Jul 26, 2024
79c3937
add aggregation to increment
andrewqian2001datadog Jul 29, 2024
f6f69a2
TESTING
andrewqian2001datadog Jul 30, 2024
6b1d497
revert name back to flush_interval
andrewqian2001datadog Jul 30, 2024
be9b099
test
andrewqian2001datadog Jul 30, 2024
e3c6299
change setter name
andrewqian2001datadog Jul 30, 2024
4b7e18d
fix lint
andrewqian2001datadog Jul 30, 2024
ddc4c5f
change order of disable aggregation and buffering
andrewqian2001datadog Jul 30, 2024
435fbf1
test
andrewqian2001datadog Jul 30, 2024
0ed2a5e
change order of disable aggregation and buffering
andrewqian2001datadog Jul 30, 2024
dbf3b64
remove setter design
andrewqian2001datadog Jul 31, 2024
cce0381
set default aggregation time interval
andrewqian2001datadog Jul 31, 2024
498ae71
fix lint
andrewqian2001datadog Jul 31, 2024
266bbc9
remove function call
andrewqian2001datadog Jul 31, 2024
23a67fc
fix lint
andrewqian2001datadog Jul 31, 2024
d3c1b64
do not allow buffering when aggregation is enabled
andrewqian2001datadog Jul 31, 2024
d527cef
testing for benchmarks
andrewqian2001datadog Jul 31, 2024
cc12b2c
check if removing logs fixes test
andrewqian2001datadog Aug 1, 2024
81a833d
add back logs
andrewqian2001datadog Aug 1, 2024
85e1548
remove logs
andrewqian2001datadog Aug 5, 2024
ea9e078
rerun tests
andrewqian2001datadog Aug 5, 2024
4da715b
set default aggregation flush interval to 2 seconds
andrewqian2001datadog Aug 6, 2024
daacfa0
remove print statement
andrewqian2001datadog Aug 6, 2024
9b48744
address comments
andrewqian2001datadog Aug 6, 2024
9b01df4
use a single flush thread
andrewqian2001datadog Aug 7, 2024
720103f
remove unecessary lint changes, fix refactored code
andrewqian2001datadog Aug 7, 2024
a64bc3a
add back import
andrewqian2001datadog Aug 7, 2024
296f756
fix refactored code
andrewqian2001datadog Aug 7, 2024
b010d88
remove lint and comment
andrewqian2001datadog Aug 7, 2024
747fd2c
check lint error
andrewqian2001datadog Aug 8, 2024
05cf94b
fix variable naming, deleted test function
andrewqian2001datadog Aug 8, 2024
7d3a5ea
add back imports for comment variables
andrewqian2001datadog Aug 8, 2024
a3749a4
fix lint
andrewqian2001datadog Aug 8, 2024
a21a551
test add disable_aggregating to test_fork_hooks
andrewqian2001datadog Aug 8, 2024
bc6d0ae
add parameter
andrewqian2001datadog Aug 8, 2024
7e79df0
revert changes
andrewqian2001datadog Aug 8, 2024
05fce38
test
andrewqian2001datadog Aug 8, 2024
3c18ba0
check for tags = None
andrewqian2001datadog Aug 8, 2024
48231a5
remove test
andrewqian2001datadog Aug 8, 2024
24f30f4
fix comment
andrewqian2001datadog Aug 9, 2024
a784757
rename variable
andrewqian2001datadog Aug 9, 2024
aa74073
Merge branch 'master' into add-client-side-aggregation
andrewqian2001datadog Aug 12, 2024
072e3d3
Merge branch 'master' into add-client-side-aggregation
andrewqian2001datadog Aug 12, 2024
13d9d39
add flush function
andrewqian2001datadog Aug 12, 2024
5857802
Merge branch 'add-client-side-aggregation' of github.com:DataDog/data…
andrewqian2001datadog Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def initialize(
api_host=None, # type: Optional[str]
statsd_host=None, # type: Optional[str]
statsd_port=None, # type: Optional[int]
statsd_disable_aggregator=True, # type: bool
statsd_disable_buffering=True, # type: bool
statsd_aggregation_flush_interval=2, # type: float
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
Expand Down Expand Up @@ -76,6 +78,14 @@ def initialize(
(default: True).
:type statsd_disable_buffering: boolean

:param statsd_disable_aggregator: Enable/disable statsd client aggregation support
(default: True).
:type statsd_disable_aggregator: boolean

:param statsd_aggregation_flush_interval: Sets the flush interval for aggregation
(default: 2 seconds)
:type statsd_aggregation_flush_interval: float

:param statsd_use_default_route: Dynamically set the statsd host to the default route
(Useful when running the client in a container)
:type statsd_use_default_route: boolean
Expand Down Expand Up @@ -128,8 +138,11 @@ def initialize(
if statsd_constant_tags:
statsd.constant_tags += statsd_constant_tags

if statsd_disable_aggregator:
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
statsd.disable_buffering = statsd_disable_buffering

api._return_raw_response = return_raw_response

# HTTP client and API options
Expand Down
62 changes: 62 additions & 0 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import threading
from datadog.dogstatsd.metrics import (
CountMetric,
GaugeMetric,
SetMetric,
)
from datadog.dogstatsd.metric_types import MetricType


class Aggregator(object):
def __init__(self):
self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
MetricType.SET: {},
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
MetricType.SET: threading.RLock(),
}

def flush_aggregated_metrics(self):
metrics = []
for metric_type in self.metrics_map.keys():
with self._locks[metric_type]:
current_metrics = self.metrics_map[metric_type]
self.metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])
return metrics

def get_context(self, name, tags):
tags_str = ",".join(tags) if tags is not None else ""
return "{}:{}".format(name, tags_str)

def count(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.COUNT, CountMetric, name, value, tags, rate, timestamp
)

def gauge(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.GAUGE, GaugeMetric, name, value, tags, rate, timestamp
)

def set(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
MetricType.SET, SetMetric, name, value, tags, rate, timestamp
)

def add_metric(
self, metric_type, metric_class, name, value, tags, rate, timestamp=0
):
context = self.get_context(name, tags)
with self._locks[metric_type]:
if context in self.metrics_map[metric_type]:
self.metrics_map[metric_type][context].aggregate(value)
else:
self.metrics_map[metric_type][context] = metric_class(
name, value, tags, rate, timestamp
)
Loading
Loading