Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make force_flush available on SDK's tracer provider #594

Merged

Conversation

mariojonke
Copy link
Contributor

Since it is currently not (easily) possible to call force flush on the span processor(s) add a function to SDK's tracer provider which delegates to the active span processor (MultiSpanProcessor).
Also implement the MultiSpanProcessor's force flush function to sequentially call force flush on the known span processors by respecting the given timeout.

* add the force_flush function on the SDK's tracer provide
  since it is currently not easily possible to call force flush
  because of the span processor(s) not being accessible.
@mariojonke mariojonke requested a review from a team April 17, 2020 05:28
flushed_within_time = sp.force_flush() and flushed_within_time
continue

remaining_time_millis = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to calculate deadline = time_ns + timeout_millis * 1000000 first and then compare against that instead of repeatedly subtracting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without subtracting it might lead to an overall timeout of about 2 times the given timeout_millis.
E.g. having 2 span processors in the MultiSpanProcessor and the 1st one flushing for almost timeout_millis. so the deadline wouldn't have been reached and the 2nd span processor would start flushing with a timout of timeout_millis.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the timeout provided to the force_flush calls made here, you can just use deadline - time_ns(). I guess you could also call that "repeatedly subtracting", but that way it does not accumulate errors from reusing the results of previous subtractions.

Returns:
False if the timeout is exceeded, True otherwise.
"""
if timeout_millis is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this if required? can't we call force_flush(timeout_millis) in both cases? I mean technically there can be a difference if a function forgot to specify a default argument or has a different one, but should we support that?

@Oberon00
Copy link
Member

Note that the same functionality was recently merged for Java: open-telemetry/opentelemetry-java#1068

Copy link
Member

@mauriciovasquezbernal mauriciovasquezbernal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea makes sense but we should uniform the different force_flush interfaces. I think having an infinitive timeout by default is not the best choice, it'll better to have some reasonable. About avoiding infinite timeout I let if to you, if you think it's easier to implement in the other places, go for it!

flush_start_ns = time_ns()
for sp in self._span_processors:
if timeout_millis is None:
flushed_within_time = sp.force_flush() and flushed_within_time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout_millis None should indicate that the force_flush could block forever, but force_flush in the span processor has a default timeout of 30 seconds.

I think we should avoid providing an option for not having a timeout or update the span processors to implement it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my initial idea was to allow individual default timeouts for each span processor (e.g. someone implementing their own span processor) in MultiSpanProcessor but i guess that makes it just more confusing.
i'll go with adding a default (not None) timeout

@@ -674,3 +693,18 @@ def shutdown(self):
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None

def force_flush(self, timeout_millis: int = None) -> bool:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to allow None, the annotation should be typing.Optional[int]

Copy link
Member

@mauriciovasquezbernal mauriciovasquezbernal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost good to go.
Just a comment to simplify the logic on the loop, I also think @Oberon00's suggestion about the deadline is a nice one.

Comment on lines 126 to 130
flushed_within_time = (
sp.force_flush(remaining_time_millis) and flushed_within_time
)

return flushed_within_time

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about?

Suggested change
flushed_within_time = (
sp.force_flush(remaining_time_millis) and flushed_within_time
)
return flushed_within_time
if not sp.force_flush(remaining_time_millis):
return False
return True

Copy link
Member

@mauriciovasquezbernal mauriciovasquezbernal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@mauriciovasquezbernal mauriciovasquezbernal added sdk Affects the SDK package. needs reviewers PRs with this label are ready for review and needs people to review to move forward. labels Apr 23, 2020
Copy link
Member

@toumorokoshi toumorokoshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I think it's important to consider some form of threading here to see if processing can be parallelized.

Also is there a specific use case you're working toward for this PR? Wondering what a real-life scenario looks like for a MultiSpanProcessor.

@@ -114,6 +114,18 @@ def shutdown(self) -> None:
for sp in self._span_processors:
sp.shutdown()

def force_flush(self, timeout_millis: int = 30000) -> bool:
deadline_ns = time_ns() + timeout_millis * 1000000
for sp in self._span_processors:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thoughts on this approach vs something like spawning multiple threads to allow the span processors to flush concurrently?

Force flush will typically fire the exporter downstream, which in many cases is a network (e.g. DataDog, stackdriver, or jaeger). Doing this synchronously you may find the timeout is reached just for the amount of time it takes to submit to one of the exporters. Also this approach results in a strong preference to the first span processor's success over the second one.

Might be good to document if that's the intended use case. Right now there's no mention of preference in failure scenarios.

Copy link
Member

@Oberon00 Oberon00 Apr 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true that threading might be helpful here. But then again

  1. Spawning threads makes everything more complicated.
  2. shutdown has the same problem as force_flush, it just lacks timeout support.

I think if we want to add the functionality you suggest, we should add some ConcurrentMultiSpanProcessor, or AsyncMultiSpanProcessor, etc. instead of making currently very simple MultiSpanProcessor more complicated. EDIT: Also, I expect the most common case will be to have just one SpanProcessor, or maybe two, one of which is a file-base / console-logging exporter. In both cases, the overhead of spawning threads would probably not be worth it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toumorokoshi, it is not that i had a concrete use case regarding the MultiSpanProcessor in mind when making this PR. It is just that you have to go through the MultiSpanProcessor when wanting to make force_flush available on the TracerProvider.
I'll add some documentation to the MultiSpanProcessor and the TracerProvider.

About the paralellism, i'd picture a solution of either having a thread pool executor in the ...MultiSpanProcessor or spawning new threads on every call to force_flush (and shutdown). Since the force_flush on the TracerProvider itself can be called from multiple threads i'm rather leaning towards the approach of having a thread pool executor with a continuous but fixed number of threads.
Either way, since this will introduce additional overhead i agree with @Oberon00 to implement this in a separate ...MultiSpanProcessor class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mariojonke fair enough. I think we may have to revisit the timeout behavior, but am fine with waiting until there's a clearer example of where this type of stacked timeout has issues.

Also fine with maybe have a synchronous vs async one. Here's a tracking ticket to rename the existing one to "Synchronous": #622

* introduce ConcurrentMultiSpanProcessor which manages a thread
  pool executor to parallelize calls to the underling span
  processors.
* make the active span processor in the SDK's tracer provider
  configurable to allow flushing spans in parallel.
  By default use the synchronous multi span processor.
* split/move span processor tests to separate test module/file
Copy link
Member

@toumorokoshi toumorokoshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple small comments. I think if it's a time priority issue I can approve, but a few refactors I think will help a lot in long-term maintenance.

for mock_processor in mocks:
multi_processor.add_span_processor(mock_processor)

multi_processor.on_start(mock.Mock(spec=trace.Span))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could use DefaultSpan if you don't need to observe the object itself.

return True


class ConcurrentMultiSpanProcessor(SpanProcessor):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah! awesome, thank you.

"""

def __init__(self, num_threads: int = 2):
# use a tuple to avoid race conditions when adding a new span and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this true? IIRC appending to a list is thread safe. I'm not sure what safety the tuple brings for iteration purposes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i took this one from the (Synchronous)MultiSpanProcessor. in think in CPython a list is thread safe due to the global interpreter lock but might not be the case for other python interpreter implementations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind the initial implementation was to avoid using locks in on_start and on_end as these are called for every span in the system. I based my decision using the following considerations, some of them could be wrong:

  • Appending to list is not thread safe in all the implementations (it's in CPython because of the GIL).
  • Variable assignment is atomic in all implementations.

I wasn't able to find any authoritative answer about that, many of the documentation I found around is CPython specific, and AFAIU the Python specification doesn't state anything about operations atomicity, so the final decision is in the Python implementation.

It'd be very nice if somebody is able to clarify all these details to understand if the current implementation is 100% safe or not.

try:
timeout_sec = (deadline_ns - time_ns()) / 1e9
flushed_in_time = (
future.result(timeout_sec) and flushed_in_time
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this will have the intended effect.

Future.result will block and time out for up to timeout_sec per call. As a result, if you have 2 futures, you'll actually be blocked for up to n * timeout_sec:

  1. future.result(timeout_sec)
  2. doesn't timeout, but finishes just under timeout_sec (~ timeout_sec time has passed)
  3. future.result(timeout_sec) Update README.md #2
  4. doesn't timoeut, but finishes just under timeout_sec (~timeout_sec * 2 time has passed)

and so on.

I believe map may be a more appropriate choice here: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.map

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, I see the timeout_sec is calculated every round, I believe this will work, but you could simplify the logic significantly by just passing all the futures to a wait function: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait

):
self._active_span_processor = MultiSpanProcessor()
self._active_span_processor = (
SynchronousMultiSpanProcessor()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be self_active_span_processor = active_span_processor or SynchronousMultiSpanProcessor(), taking advantage of Python's falsy values.

@@ -727,8 +852,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
The span processors are invoked in the same order they are registered.
"""

# no lock here because MultiSpanProcessor.add_span_processor is
# thread safe
# no lock here because add_span_processor is thread safe for both
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woo! great.

mock_processor2 = mock.Mock(spec=trace.SpanProcessor)
multi_processor.add_span_processor(mock_processor2)

with (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is fairly implementation dependent. That works here but is brittle to the implementation details of the code. You could test this by creating a SpanProcessor that uses time.sleep for a specified period (maybe 50ms), and then having a SpanProcessor configured with an aggressive timeout.

# let the thread executing the late_mock continue
wait_event.set()

self.assertFalse(flushed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assert could move on line 290?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this could become troublesome if the assert actually fails because the ThreadPoolExecutor is joining all its threads in an atexit handler. So this would cause the test process to run indefinitely since noone is setting the wait_event.

self.assertListEqual(spans_calls_list, expected_list)


class TestSynchronousMultiSpanProcessor(unittest.TestCase):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we really need two test clases, oner per span processor. Seems like they should be behave in almost identical manner, except how they deal with timeouts.

Thoughts on simplifying the test suite here and consolidating the common tests to a base class?

* instead of waiting on every future and recalculating the timeout,
  use the existing futures.wait method.
* consolidate common multi span processor tests into a separate
  base class
@mauriciovasquezbernal mauriciovasquezbernal dismissed their stale review May 6, 2020 16:52

PR has changed and original review is not valid anymore.

@mauriciovasquezbernal
Copy link
Member

I'm not sure about the added value of ConcurrentMultiSpanProcessor. As a bit of context, the purpose of having a multi span processor is to allow to pass a single span processor instance to the spans, so we can avoid all the boilerplate of handling multiple span processors there.

I'd expect that in a production environment the different span processors will be asynchronous, BatchSpanProcessor or similar ones. Given that, a sequential implementation of multi span processor will work just fine because the calls to on_start and on_end are not blocking. With ConcurrentMultiSpanProcessor we are adding another level of concurrent logic, that IMO will not present any benefit if the underlying processors are asynchronous.

I know this new implementation was created because force_flush is a blocking operation, so the last processor will have less time to process it. I'm wondering if a different solution could be to add non blocking force_flush implementation to the different span processors. Something like:

# multispanprocessor
def force_flush(self):
  futures = []

  for sp in self._span_processors:
    future = sp.force_flush_async()
    futures.append(future)
    
  # wait on all futures

@toumorokoshi
Copy link
Member

@mauriciovasquezbernal that works somewhat, although one design decision we made was having timeout / retry logic live outside the exporter (which could also be argued in one direction or another).

You bring up a good point around potentially introducing multiple levels of concurrency: I don't think it's the end of the world for a thread to spawn another thread needlessly, but it would be good to avoid if possible.

Looking at our examples, the SpanProcessors don't spawn any threads, so I presume you're referring to the exporters that may spawn them. I don't see any of the exporters spawning a thread explicitly, although Jaeger's reliance on thrift may be doing so.

@toumorokoshi
Copy link
Member

regarding adding a new API: I'd argue that we stick to one function for flushing. One nice thing about async is that it can be turned nearly synchronous just by blocking on the method.

Copy link
Member

@toumorokoshi toumorokoshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@mauriciovasquezbernal
Copy link
Member

@toumorokoshi the BatchSpanProcessor has a worker thread that does almost all the exporting work. It's for that reason that I'm not totally convinced about having a concurrent multiple span processor implementation, I think the main purpose of the multiple span processor is to provide an easy way to invoke multiple span processors and any concurrent logic should be implemented in the different span processors.

I was suggesting force_flush_async because the current implementation is blocking then the multiple span processor cannot call it concurrently.

@codeboten codeboten added this to the Beta v0.9 milestone Jun 4, 2020
Copy link
Contributor

@lzchen lzchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work.

@toumorokoshi
Copy link
Member

@toumorokoshi the BatchSpanProcessor has a worker thread that does almost all the exporting work. It's for that reason that I'm not totally convinced about having a concurrent multiple span processor implementation, I think the main purpose of the multiple span processor is to provide an easy way to invoke multiple span processors and any concurrent logic should be implemented in the different span processors.

I was suggesting force_flush_async because the current implementation is blocking then the multiple span processor cannot call it concurrently.

Hey Mauricio, I added #783 to make sure we discuss this before GA. I'm going to merge this in, because at least we'll provide the force_flush function that is needed for the MultiSpanProcessor.

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Jun 6, 2020

CLA Check
The committers are authorized under a signed CLA.

@toumorokoshi toumorokoshi removed the needs reviewers PRs with this label are ready for review and needs people to review to move forward. label Jun 6, 2020
@codeboten codeboten merged commit db27c7c into open-telemetry:master Jun 10, 2020
codeboten pushed a commit to codeboten/opentelemetry-python that referenced this pull request Jun 11, 2020
Co-authored-by: Yusuke Tsutsumi <yusuke@tsutsumi.io>
@mariojonke mariojonke deleted the tracer_provider_force_flush branch June 24, 2020 05:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sdk Affects the SDK package.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants