Skip to content

Commit 5337c43

Browse files
committed
introduce IntervalTimer and use it instead of threading.Timer (#453)
`threading.Timer` only runs the function once, and then stops the thread. To get an interval timer, we used to start a new thread when the original thread expired. This is wasteful, and in the worst case can lead to runoff creation of timer threads (#450). Instead, the `IntervalTimer` will run in an endless loop, executing the given function every n seconds. closes #453
1 parent 7764138 commit 5337c43

File tree

4 files changed

+121
-8
lines changed

4 files changed

+121
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v4.2.1...master)
55
* fixed an issue with Celery and the prefork worker pool (#444)
66
* fixed an issue when running uwsgi without a master process (#446)
7+
* introduced `IntervalTimer` and use it instead of `threading.Timer` (#452)
78

89
## v4.2.1
910
[Check the diff](https://github.com/elastic/apm-agent-python/compare/v4.2.0...v4.2.1)

elasticapm/metrics/base_metrics.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
from elasticapm.utils import compat, is_master_process
3636
from elasticapm.utils.module_import import import_string
37+
from elasticapm.utils.threading import IntervalTimer
3738

3839
logger = logging.getLogger("elasticapm.metrics")
3940

@@ -74,15 +75,11 @@ def register(self, class_path):
7475
except ImportError as e:
7576
logger.warning("Could not register %s metricset: %s", class_path, compat.text_type(e))
7677

77-
def collect(self, start_timer=True):
78+
def collect(self):
7879
"""
7980
Collect metrics from all registered metric sets
80-
:param start_timer: if True, restarts the collect timer after collection
8181
:return:
8282
"""
83-
if start_timer:
84-
self._start_collect_timer()
85-
8683
logger.debug("Collecting metrics")
8784

8885
for name, metricset in compat.iteritems(self._metricsets):
@@ -92,9 +89,7 @@ def collect(self, start_timer=True):
9289

9390
def _start_collect_timer(self, timeout=None):
9491
timeout = timeout or self._collect_interval
95-
self._collect_timer = threading.Timer(timeout, self.collect, kwargs={"start_timer": True})
96-
self._collect_timer.name = "elasticapm metrics collect timer"
97-
self._collect_timer.daemon = True
92+
self._collect_timer = IntervalTimer(self.collect, timeout, name="eapm metrics collect timer", daemon=True)
9893
logger.debug("Starting metrics collect timer")
9994
self._collect_timer.start()
10095

elasticapm/utils/threading.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
30+
31+
from __future__ import absolute_import
32+
33+
import threading
34+
from timeit import default_timer
35+
36+
37+
class IntervalTimer(threading.Thread):
38+
"""
39+
A timer that runs a function repeatedly. In contrast to threading.Timer,
40+
IntervalTimer runs the given function in perpetuity or until it is cancelled.
41+
When run, it will wait `interval` seconds until the first execution.
42+
"""
43+
44+
def __init__(self, function, interval, name=None, args=(), kwargs=None, daemon=None):
45+
super(IntervalTimer, self).__init__(name=name)
46+
self.daemon = daemon
47+
self._args = args
48+
self._kwargs = kwargs if kwargs is not None else {}
49+
self._function = function
50+
self._interval = interval
51+
self._interval_done = threading.Event()
52+
53+
def run(self):
54+
execution_time = 0
55+
while True:
56+
real_interval = self._interval - execution_time
57+
interval_completed = True
58+
if real_interval:
59+
interval_completed = not self._interval_done.wait(real_interval)
60+
if not interval_completed:
61+
# we've been cancelled, time to go home
62+
return
63+
start = default_timer()
64+
self._function(*self._args, **self._kwargs)
65+
execution_time = default_timer() - start
66+
67+
def cancel(self):
68+
self._interval_done.set()

tests/utils/threading_tests.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
30+
import time
31+
32+
import mock
33+
34+
from elasticapm.utils.threading import IntervalTimer
35+
36+
37+
def test_interval_timer():
38+
func = mock.Mock()
39+
timer = IntervalTimer(function=func, interval=0.1, args=(1,), kwargs={"a": "b"})
40+
timer.start()
41+
time.sleep(0.25)
42+
try:
43+
assert func.call_count == 2
44+
for call in func.call_args_list:
45+
assert call == ((1,), {"a": "b"})
46+
finally:
47+
timer.cancel()
48+
time.sleep(0.05)
49+
assert not timer.is_alive()

0 commit comments

Comments
 (0)