Skip to content

Commit 55f3b05

Browse files
mariojonketoumorokoshi
authored andcommitted
Make force_flush available on SDK's tracer provider (open-telemetry#594)
Co-authored-by: Yusuke Tsutsumi <yusuke@tsutsumi.io>
1 parent 7e457d1 commit 55f3b05

File tree

3 files changed

+463
-7
lines changed

3 files changed

+463
-7
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py

Lines changed: 150 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import abc
1717
import atexit
18+
import concurrent.futures
1819
import json
1920
import logging
2021
import random
@@ -23,7 +24,17 @@
2324
from collections import OrderedDict
2425
from contextlib import contextmanager
2526
from types import TracebackType
26-
from typing import Iterator, MutableSequence, Optional, Sequence, Tuple, Type
27+
from typing import (
28+
Any,
29+
Callable,
30+
Iterator,
31+
MutableSequence,
32+
Optional,
33+
Sequence,
34+
Tuple,
35+
Type,
36+
Union,
37+
)
2738

2839
from opentelemetry import context as context_api
2940
from opentelemetry import trace as trace_api
@@ -90,9 +101,12 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
90101
"""
91102

92103

93-
class MultiSpanProcessor(SpanProcessor):
94-
"""Implementation of :class:`SpanProcessor` that forwards all received
95-
events to a list of `SpanProcessor`.
104+
class SynchronousMultiSpanProcessor(SpanProcessor):
105+
"""Implementation of class:`SpanProcessor` that forwards all received
106+
events to a list of span processors sequentially.
107+
108+
The underlying span processors are called in sequential order as they were
109+
added.
96110
"""
97111

98112
def __init__(self):
@@ -115,9 +129,113 @@ def on_end(self, span: "Span") -> None:
115129
sp.on_end(span)
116130

117131
def shutdown(self) -> None:
132+
"""Sequentially shuts down all underlying span processors.
133+
"""
118134
for sp in self._span_processors:
119135
sp.shutdown()
120136

137+
def force_flush(self, timeout_millis: int = 30000) -> bool:
138+
"""Sequentially calls force_flush on all underlying
139+
:class:`SpanProcessor`
140+
141+
Args:
142+
timeout_millis: The maximum amount of time over all span processors
143+
to wait for spans to be exported. In case the first n span
144+
processors exceeded the timeout followup span processors will be
145+
skipped.
146+
147+
Returns:
148+
True if all span processors flushed their spans within the
149+
given timeout, False otherwise.
150+
"""
151+
deadline_ns = time_ns() + timeout_millis * 1000000
152+
for sp in self._span_processors:
153+
current_time_ns = time_ns()
154+
if current_time_ns >= deadline_ns:
155+
return False
156+
157+
if not sp.force_flush((deadline_ns - current_time_ns) // 1000000):
158+
return False
159+
160+
return True
161+
162+
163+
class ConcurrentMultiSpanProcessor(SpanProcessor):
164+
"""Implementation of :class:`SpanProcessor` that forwards all received
165+
events to a list of span processors in parallel.
166+
167+
Calls to the underlying span processors are forwarded in parallel by
168+
submitting them to a thread pool executor and waiting until each span
169+
processor finished its work.
170+
171+
Args:
172+
num_threads: The number of threads managed by the thread pool executor
173+
and thus defining how many span processors can work in parallel.
174+
"""
175+
176+
def __init__(self, num_threads: int = 2):
177+
# use a tuple to avoid race conditions when adding a new span and
178+
# iterating through it on "on_start" and "on_end".
179+
self._span_processors = () # type: Tuple[SpanProcessor, ...]
180+
self._lock = threading.Lock()
181+
self._executor = concurrent.futures.ThreadPoolExecutor(
182+
max_workers=num_threads
183+
)
184+
185+
def add_span_processor(self, span_processor: SpanProcessor) -> None:
186+
"""Adds a SpanProcessor to the list handled by this instance."""
187+
with self._lock:
188+
self._span_processors = self._span_processors + (span_processor,)
189+
190+
def _submit_and_await(
191+
self, func: Callable[[SpanProcessor], Callable[..., None]], *args: Any
192+
):
193+
futures = []
194+
for sp in self._span_processors:
195+
future = self._executor.submit(func(sp), *args)
196+
futures.append(future)
197+
for future in futures:
198+
future.result()
199+
200+
def on_start(self, span: "Span") -> None:
201+
self._submit_and_await(lambda sp: sp.on_start, span)
202+
203+
def on_end(self, span: "Span") -> None:
204+
self._submit_and_await(lambda sp: sp.on_end, span)
205+
206+
def shutdown(self) -> None:
207+
"""Shuts down all underlying span processors in parallel."""
208+
self._submit_and_await(lambda sp: sp.shutdown)
209+
210+
def force_flush(self, timeout_millis: int = 30000) -> bool:
211+
"""Calls force_flush on all underlying span processors in parallel.
212+
213+
Args:
214+
timeout_millis: The maximum amount of time to wait for spans to be
215+
exported.
216+
217+
Returns:
218+
True if all span processors flushed their spans within the given
219+
timeout, False otherwise.
220+
"""
221+
futures = []
222+
for sp in self._span_processors: # type: SpanProcessor
223+
future = self._executor.submit(sp.force_flush, timeout_millis)
224+
futures.append(future)
225+
226+
timeout_sec = timeout_millis / 1e3
227+
done_futures, not_done_futures = concurrent.futures.wait(
228+
futures, timeout_sec
229+
)
230+
if not_done_futures:
231+
return False
232+
233+
for future in done_futures:
234+
if not future.result():
235+
return False
236+
237+
return True
238+
121239

122240
class EventBase(abc.ABC):
123241
def __init__(self, name: str, timestamp: Optional[int] = None) -> None:
@@ -742,8 +860,13 @@ def __init__(
742860
sampler: sampling.Sampler = trace_api.sampling.ALWAYS_ON,
743861
resource: Resource = Resource.create_empty(),
744862
shutdown_on_exit: bool = True,
863+
active_span_processor: Union[
864+
SynchronousMultiSpanProcessor, ConcurrentMultiSpanProcessor
865+
] = None,
745866
):
746-
self._active_span_processor = MultiSpanProcessor()
867+
self._active_span_processor = (
868+
active_span_processor or SynchronousMultiSpanProcessor()
869+
)
747870
self.resource = resource
748871
self.sampler = sampler
749872
self._atexit_handler = None
@@ -771,8 +894,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
771894
The span processors are invoked in the same order they are registered.
772895
"""
773896

774-
# no lock here because MultiSpanProcessor.add_span_processor is
775-
# thread safe
897+
# no lock here because add_span_processor is thread safe for both
898+
# SynchronousMultiSpanProcessor and ConcurrentMultiSpanProcessor.
776899
self._active_span_processor.add_span_processor(span_processor)
777900

778901
def shutdown(self):
@@ -781,3 +904,23 @@ def shutdown(self):
781904
if self._atexit_handler is not None:
782905
atexit.unregister(self._atexit_handler)
783906
self._atexit_handler = None
907+
908+
def force_flush(self, timeout_millis: int = 30000) -> bool:
909+
"""Requests the active span processor to process all spans that have not
910+
yet been processed.
911+
912+
By default force flush is called sequentially on all added span
913+
processors. This means that span processors further back in the list
914+
have less time to flush their spans.
915+
To have span processors flush their spans in parallel it is possible to
916+
initialize the tracer provider with an instance of
917+
`ConcurrentMultiSpanProcessor` at the cost of using multiple threads.
918+
919+
Args:
920+
timeout_millis: The maximum amount of time to wait for spans to be
921+
processed.
922+
923+
Returns:
924+
False if the timeout is exceeded, True otherwise.
925+
"""
926+
return self._active_span_processor.force_flush(timeout_millis)

0 commit comments

Comments
 (0)