Skip to content

Commit deb4969

Browse files
committed
Improve profiler scheduling
1 parent 1e7d98e commit deb4969

14 files changed

+274
-185
lines changed

graphsignal/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def configure(
2525
deployment: Optional[str] = None,
2626
tags: Optional[Dict[str, str]] = None,
2727
auto_instrument: Optional[bool] = None,
28-
profiling_rate: Optional[float] = None,
28+
profiles_per_min: Optional[int] = None,
2929
include_profiles: Optional[list] = None,
3030
debug_mode: Optional[bool] = None
3131
) -> None:
@@ -39,7 +39,7 @@ def configure(
3939
api_url = read_config_param("api_url", str, api_url)
4040
tags = read_config_tags(tags)
4141
auto_instrument = read_config_param("auto_instrument", bool, auto_instrument, default_value=True)
42-
profiling_rate = read_config_param("profiling_rate", float, profiling_rate, default_value=0.1)
42+
profiles_per_min = read_config_param("profiles_per_min", int, profiles_per_min)
4343
include_profiles = read_config_param("include_profiles", list, include_profiles)
4444
debug_mode = read_config_param("debug_mode", bool, debug_mode, default_value=False)
4545

@@ -52,7 +52,7 @@ def configure(
5252
api_url=api_url,
5353
tags=tags,
5454
auto_instrument=auto_instrument,
55-
profiling_rate=profiling_rate,
55+
profiles_per_min=profiles_per_min,
5656
include_profiles=include_profiles,
5757
debug_mode=debug_mode)
5858
_tracer.setup()

graphsignal/profiles.py

Lines changed: 0 additions & 45 deletions
This file was deleted.

graphsignal/recorders/python_recorder.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ def _can_include_profiles(self, span, profiles):
2020
span.can_include_profiles(profiles))
2121

2222
def on_span_start(self, span, context):
23-
if (span.sampled() and
24-
self._can_include_profiles(span, ['profile.cpython']) and
25-
graphsignal._tracer.set_profiling_mode()):
23+
if (self._can_include_profiles(span, ['profile.cpython']) and
24+
graphsignal._tracer.set_profiling_mode('profile.cpython')):
2625

2726
context['profiled'] = True
2827

graphsignal/recorders/pytorch_recorder.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ def _can_include_profiles(self, span, profiles):
2525
span.can_include_profiles(profiles))
2626

2727
def on_span_start(self, span, context):
28-
if (span.sampled() and
29-
self._can_include_profiles(span, ['profile.pytorch']) and
30-
graphsignal._tracer.set_profiling_mode()):
28+
if (self._can_include_profiles(span, ['profile.pytorch']) and
29+
graphsignal._tracer.set_profiling_mode('profile.pytorch')):
3130
context['profiled'] = True
3231

3332
if self._torch_prof:

graphsignal/spans.py

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,12 @@ def __init__(self, name, value, unit=None):
5252
class SpanContext:
5353
__slots__ = [
5454
'trace_id',
55-
'span_id',
56-
'sampled'
55+
'span_id'
5756
]
5857

59-
def __init__(self, trace_id=None, span_id=None, sampled=None):
58+
def __init__(self, trace_id=None, span_id=None):
6059
self.trace_id = trace_id
6160
self.span_id = span_id
62-
self.sampled = sampled
6361

6462
@staticmethod
6563
def push_contextvars(ctx):
@@ -80,26 +78,24 @@ def loads(value):
8078
logger.debug(f'SpanContext.loads: invalid context value: {value}')
8179
return None
8280
parts = value.split('-')
83-
if len(parts) < 3:
81+
if len(parts) < 2:
8482
if logger.isEnabledFor(logging.DEBUG):
8583
logger.debug(f'SpanContext.loads: invalid context value: {value}')
8684
return None
8785
ctx = SpanContext()
8886
ctx.trace_id = parts[0]
8987
ctx.span_id = parts[1]
90-
ctx.sampled = parts[2] == '1'
9188
return ctx
9289

9390
@staticmethod
9491
def dumps(ctx):
95-
if ctx is None or ctx.trace_id is None or ctx.span_id is None or ctx.sampled is None:
92+
if ctx is None or ctx.trace_id is None or ctx.span_id is None:
9693
if logger.isEnabledFor(logging.DEBUG):
9794
logger.debug('SpanContext.dumps: invalid context')
9895
return None
99-
return '{0}-{1}-{2}'.format(
96+
return '{0}-{1}'.format(
10097
ctx.trace_id,
101-
ctx.span_id,
102-
'1' if ctx.sampled else '0')
98+
ctx.span_id)
10399

104100
class Span:
105101
MAX_SPAN_TAGS = 25
@@ -120,7 +116,6 @@ class Span:
120116
'_parent_span_id',
121117
'_linked_span_ids',
122118
'_is_root',
123-
'_sampled',
124119
'_recorder_context',
125120
'_model',
126121
'_is_started',
@@ -161,11 +156,9 @@ def __init__(self, operation, tags=None, include_profiles=None, parent_context=N
161156
self._span_id = None
162157
self._trace_id = None
163158
self._parent_span_id = None
164-
self._sampled = None
165159
if parent_context:
166160
self._trace_id = parent_context.trace_id
167161
self._parent_span_id = parent_context.span_id
168-
self._sampled = parent_context.sampled
169162
self._linked_span_ids = None
170163
self._is_root = False
171164
self._start_us = None
@@ -209,6 +202,9 @@ async def __aexit__(self, *exc_info):
209202
self.stop()
210203
return False
211204

205+
def _should_record(self) -> bool:
206+
return self._profiles is not None and len(self._profiles) > 0
207+
212208
def _start(self):
213209
if self._is_started:
214210
return
@@ -222,9 +218,7 @@ def _start(self):
222218
if self._trace_id is None:
223219
self._trace_id = uuid_sha1(size=12)
224220
self._is_root = True
225-
if self._sampled is None:
226-
self._sampled = fast_rand() <= graphsignal._tracer.sampling_rate
227-
221+
228222
self._context_tags = _tracer().context_tags.get().copy()
229223

230224
self._model = client.Span(
@@ -307,7 +301,16 @@ def _stop(self) -> None:
307301
_tracer().metric_store().inc_counter(
308302
name=metric.name, tags=span_tags, value=metric.value, update_ts=now)
309303

310-
if self._sampled:
304+
# report issues
305+
if self._exc_infos:
306+
for exc_info in self._exc_infos:
307+
_tracer().report_issue(
308+
name='operation.error',
309+
severity=1,
310+
description=f'{exc_info[0].__name__}: {exc_info[1]}',
311+
span=self)
312+
313+
if self._should_record():
311314
# fill and upload span
312315
# copy data to span model
313316
self._model.start_us = self._start_us
@@ -404,8 +407,7 @@ def trace_id(self) -> str:
404407
def get_span_context(self):
405408
return SpanContext(
406409
trace_id=self._trace_id,
407-
span_id=self._span_id,
408-
sampled=self._sampled)
410+
span_id=self._span_id)
409411

410412
def add_linked_span(self, span_id: str) -> None:
411413
if not span_id:
@@ -417,9 +419,6 @@ def add_linked_span(self, span_id: str) -> None:
417419

418420
self._linked_span_ids.append(span_id)
419421

420-
def sampled(self) -> bool:
421-
return self._sampled
422-
423422
def can_include_profiles(self, profiles) -> bool:
424423
if self._include_profiles_index is None:
425424
return True

graphsignal/tracer.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
logger = logging.getLogger('graphsignal')
2323

2424

25+
2526
class GraphsignalTracerLogHandler(logging.Handler):
2627
def __init__(self, tracer):
2728
super().__init__()
@@ -102,6 +103,34 @@ def find_spec(self, fullname, path=None, target=None):
102103

103104
return None
104105

106+
class ProfilingTokenBucket:
107+
def __init__(self, sampling_rate_per_minute: float):
108+
self.capacity = sampling_rate_per_minute # max tokens (samples) per minute
109+
self.tokens = self.capacity # start full
110+
self.refill_rate_per_sec = self.capacity / 60.0 # tokens per second
111+
self.last_refill_time = time.monotonic()
112+
self._first_request_skipped = False # Track if first request has been skipped
113+
114+
def _refill(self):
115+
now = time.monotonic()
116+
elapsed = now - self.last_refill_time
117+
if elapsed > 0:
118+
# Add tokens based on elapsed time
119+
new_tokens = elapsed * self.refill_rate_per_sec
120+
self.tokens = min(self.capacity, self.tokens + new_tokens)
121+
self.last_refill_time = now
122+
123+
def should_profile(self) -> bool:
124+
# Skip the first profiling request
125+
if not self._first_request_skipped:
126+
self._first_request_skipped = True
127+
return False
128+
129+
self._refill()
130+
if self.tokens >= 1:
131+
self.tokens -= 1
132+
return True
133+
return False
105134

106135
class Tracer:
107136
TICK_INTERVAL_SEC = 10
@@ -114,8 +143,7 @@ def __init__(
114143
api_url=None,
115144
tags=None,
116145
auto_instrument=True,
117-
sampling_rate=1.0,
118-
profiling_rate=0.1,
146+
profiles_per_min=10,
119147
include_profiles=None,
120148
debug_mode=False):
121149
if debug_mode:
@@ -139,11 +167,12 @@ def __init__(
139167
self.params = {}
140168

141169
self.auto_instrument = auto_instrument
142-
self.sampling_rate = sampling_rate if sampling_rate is not None else 1.0
143-
self.profiling_rate = profiling_rate if profiling_rate is not None else 0
170+
self.profiles_per_min = profiles_per_min if profiles_per_min is not None else 10
144171
self.include_profiles = include_profiles
145172
self.debug_mode = debug_mode
146173

174+
self._profiling_token_buckets = {}
175+
147176
self._tick_timer_thread = None
148177
self._tick_stop_event = threading.Event()
149178
self._tick_lock = threading.Lock()
@@ -272,8 +301,11 @@ def metric_store(self):
272301
def log_store(self):
273302
return self._log_store
274303

275-
def set_profiling_mode(self):
276-
if fast_rand() > self.profiling_rate:
304+
def set_profiling_mode(self, profile_name):
305+
if profile_name not in self._profiling_token_buckets:
306+
self._profiling_token_buckets[profile_name] = ProfilingTokenBucket(self.profiles_per_min)
307+
308+
if not self._profiling_token_buckets[profile_name].should_profile():
277309
return False
278310

279311
with self._profiling_mode_lock:

graphsignal/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.18.0'
1+
__version__ = '0.18.1'

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "graphsignal"
3-
version = "0.18.0"
3+
version = "0.18.1"
44
description = "Graphsignal Tracer for Python"
55
authors = ["Graphsignal, Inc. <devops@graphsignal.com>"]
66
license = "Apache-2.0"

test/recorders/test_instrumentation.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import graphsignal
1313
from graphsignal.recorders.instrumentation import patch_method, trace_method, profile_method, read_args, parse_semver, compare_semver
14+
from graphsignal.spans import Span
1415
from graphsignal.uploader import Uploader
1516
from test.model_utils import find_tag, find_counter
1617

@@ -98,8 +99,9 @@ def profile_func(event_name, duration_ns):
9899
self.assertTrue(measured_duration_ns > 0)
99100

100101

102+
@patch.object(Span, '_should_record', return_value=True)
101103
@patch.object(Uploader, 'upload_span')
102-
async def test_trace_method(self, mocked_upload_span):
104+
async def test_trace_method(self, mocked_upload_span, mocked_should_record):
103105
obj = Dummy()
104106

105107
trace_func_called = False
@@ -116,8 +118,9 @@ def trace_func(span, args, kwargs, ret, exc):
116118
self.assertTrue(trace_func_called)
117119
self.assertEqual(find_tag(model, 'operation.name'), 'op1')
118120

121+
@patch.object(Span, '_should_record', return_value=True)
119122
@patch.object(Uploader, 'upload_span')
120-
async def test_trace_method_generator(self, mocked_upload_span):
123+
async def test_trace_method_generator(self, mocked_upload_span, mocked_should_record):
121124
obj = Dummy()
122125

123126
trace_func_called = None

test/recorders/test_python_recorder.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ def setUp(self):
2323
logger.addHandler(logging.StreamHandler(sys.stdout))
2424
graphsignal.configure(
2525
api_key='k1',
26-
profiling_rate=1,
2726
debug_mode=True)
2827
graphsignal._tracer.auto_export = False
2928

@@ -34,6 +33,10 @@ def tearDown(self):
3433
@patch.object(Tracer, 'emit_span_stop')
3534
@patch.object(Tracer, 'emit_span_read')
3635
def test_record(self, mocked_emit_span_read, mocked_emit_span_stop, mocked_emit_span_start):
36+
# First call will be skipped, second call should succeed
37+
graphsignal._tracer.set_profiling_mode('profile.cpython')
38+
graphsignal._tracer.unset_profiling_mode()
39+
3740
recorder = PythonRecorder()
3841
recorder._exclude_path = 'donotmatchpath'
3942
recorder.setup()

0 commit comments

Comments
 (0)