Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When utilizing the Opentelemetry SDK to export traces, performance experienced a significant decline under multi-threading concurrency #3663

Closed
HustThulium opened this issue Feb 2, 2024 · 1 comment · Fixed by #3763
Labels
bug Something isn't working

Comments

@HustThulium
Copy link

Describe your environment
[Python Version] Python3.8
[OS] Windows 10
[platform] x86_64
[opentelemetry-python] 1.21.0

Steps to reproduce

  1. init a simple provider
tracer = trace.get_tracer("name", "version", None, "url")

set_service_info("server_name", "server_version", "server_instance")
memory_exporter = InMemorySpanExporter()
trace_processor = SynchronousMultiSpanProcessor()
trace_processor.add_span_processor(
    span_processor=BatchSpanProcessor(
        span_exporter=memory_exporter,
        schedule_delay_millis=2000,
        max_queue_size=1000000,
        max_export_batch_size=400
    )
)
trace_provider = TracerProvider(resource=trace_resource(), active_span_processor=trace_processor)
set_tracer_provider(tracer_provider=trace_provider)
  1. make a demo func
def func(threadName, a, b):
    with tracer.start_as_current_span("%s%d" % (threadName, 0)):
        for i1 in range(3):
            with tracer.start_as_current_span("%s%d" % (threadName, 0)):
                for j1 in range(10):
                    with tracer.start_as_current_span("%s%d%d" % (threadName, i1, j1)):
                        c = a + b

def test(threadName):
    start_time = time.time()
    for i in range(1000):
        func(threadName, 1, 2)
    end_time = time.time()
    print("%s %.2fs" % (threadName, end_time - start_time))
    return end_time - start_time
  1. do test
t1 = threading.Thread(target=test, name="t1", args=("t1", ))
t2 = threading.Thread(target=test, name="t2", args=("t2", ))
t3 = threading.Thread(target=test, name="t3", args=("t3", ))
t4 = threading.Thread(target=test, name="t4", args=("t4", ))
t1.start()
t2.start()
t3.start()
t4.start()

t1.join()
t2.join()
t3.join()
t4.join()

time.sleep(1)

t = 0
t += test("t1")
t += test("t2")
t += test("t3")
t += test("t4")
print("total: %.2f" % t)

What is the expected behavior?
In both single-threaded and multi-threaded execution modes, the performance is comparable.

What is the actual behavior?
Performance experienced a significant decline under multi-threading concurrency

Additional context
https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-api/src/opentelemetry/context/__init__.py

def _load_runtime_context(func: _F) -> _F:
    """A decorator used to initialize the global RuntimeContext

    Returns:
        A wrapper of the decorated method.
    """

    @wraps(func)  # type: ignore[misc]
    def wrapper(  # type: ignore[misc]
        *args: typing.Tuple[typing.Any, typing.Any],
        **kwargs: typing.Dict[typing.Any, typing.Any],
    ) -> typing.Optional[typing.Any]:
        global _RUNTIME_CONTEXT  # pylint: disable=global-statement

        with _RUNTIME_CONTEXT_LOCK:
            if _RUNTIME_CONTEXT is None:
                # FIXME use a better implementation of a configuration manager
                # to avoid having to get configuration values straight from
                # environment variables
                default_context = "contextvars_context"

                configured_context = environ.get(
                    OTEL_PYTHON_CONTEXT, default_context
                )  # type: str
                try:

                    _RUNTIME_CONTEXT = next(  # type: ignore
                        iter(  # type: ignore
                            entry_points(  # type: ignore
                                group="opentelemetry_context",
                                name=configured_context,
                            )
                        )
                    ).load()()

                except Exception:  # pylint: disable=broad-except
                    logger.exception(
                        "Failed to load context: %s", configured_context
                    )
        return func(*args, **kwargs)  # type: ignore[misc]

    return typing.cast(_F, wrapper)  # type: ignore[misc]

with _RUNTIME_CONTEXT_LOCK:

This thread lock is the reason caused performance declined.

The wrapper function will be called EACH TIME a span is created.

This lock is made for init _RUNTIME_CONTEXT object excatly ONCE.
But _RUNTIME_CONTEXT can not be None after it inited.

So in most cases, this func will do nothing under the lock.
Lock and unlock operation make the performance decline under multi-threading concurrency.

 if _RUNTIME_CONTEXT is None:
    with _RUNTIME_CONTEXT_LOCK:
        if _RUNTIME_CONTEXT is None:
            ...

If add a condition before lock operation, the performance will be comparable with single-threaded mode.

@aabmass
Copy link
Member

aabmass commented Apr 18, 2024

Thanks for the report, I was able to reproduce your results. Here is the full script if others want to try it:

Full python script

import time
import threading
 
from opentelemetry import trace
from opentelemetry.sdk.trace import (                                                                                                                                  
    SynchronousMultiSpanProcessor,
    TracerProvider,
)
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
    InMemorySpanExporter,                                                                                                                                              
)
 
tracer = trace.get_tracer("name", "version", None, "url")
 
memory_exporter = InMemorySpanExporter()
trace_processor = SynchronousMultiSpanProcessor()
trace_processor.add_span_processor(
    span_processor=BatchSpanProcessor(
        span_exporter=memory_exporter,
        schedule_delay_millis=2000,
        max_queue_size=1000000,
        max_export_batch_size=400,
    )
)
trace_provider = TracerProvider(active_span_processor=trace_processor)                                                                                                 
trace.set_tracer_provider(tracer_provider=trace_provider)
 
 
def func(threadName: str, a: int, b: int):
    with tracer.start_as_current_span("%s%d" % (threadName, 0)):                                                                                                       
        for i1 in range(3):
            with tracer.start_as_current_span("%s%d" % (threadName, 0)):                                                                                               
                for j1 in range(10):
                    with tracer.start_as_current_span(
                        "%s%d%d" % (threadName, i1, j1)                                                                                                                
                    ):
                        c = a + b
 
 
def test(threadName: str):
    start_time = time.time()                                                                                                                                                                                                                                                                                                                 
    for i in range(1000):                                                                                                                                                                                                                                                                                                                    
        func(threadName, 1, 2)                                                                                                                                                                                                                                                                                                               
    end_time = time.time()                                                                                                                                                                                                                                                                                                                   
    print("%s %.2fs" % (threadName, end_time - start_time))                                                                                                                                                                                                                                                                                  
    return end_time - start_time                                                                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                                                                             
                                                                                                                                                                                                                                                                                                                                             
t1 = threading.Thread(target=test, name="t1", args=("t1",))                                                                                                                                                                                                                                                                                  
t2 = threading.Thread(target=test, name="t2", args=("t2",))                                                                                                                                                                                                                                                                                  
t3 = threading.Thread(target=test, name="t3", args=("t3",))                                                                                                                                                                                                                                                                                  
t4 = threading.Thread(target=test, name="t4", args=("t4",))                                                                                                                                                                                                                                                                                  
t1.start()                                                                                                                                                                                                                                                                                                                                   
t2.start()                                                                                                                                                                                                                                                                                                                                   
t3.start()                                                                                                                                                                                                                                                                                                                                   
t4.start()                                                                                                                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                                                                                             
t1.join()                                                                                                                                                                                                                                                                                                                                    
t2.join()                                                                                                                                                                                                                                                                                                                                    
t3.join()                                                                                                                                                                                                                                                                                                                                    
t4.join()                                                                                                                                                                                                                                                                                                                                    
                                                                                                                                                                                                                                                                                                                                             
time.sleep(1)                                                                                                                                                                                                                                                                                                                                
                                                                                                                                                                                                                                                                                                                                             
t = 0                                                                                                                                                                                                                                                                                                                                        
t += test("t1")                                                                                                                                                                                                                                                                                                                              
t += test("t2")                                                                                                                                                                                                                                                                                                                              
t += test("t3")                                                                                                                                                                                                                                                                                                                              
t += test("t4")                                                                                                                                                                                                                                                                                                                              
print("total: %.2f" % t)

Output on my system with Python 3.11.8

# HEAD
$ python bench.py 
t1 16.61s
t4 17.40s
t2 17.47s
t3 17.53s
t1 1.33s
t2 1.39s
t3 1.54s
t4 0.86s
total: 5.13

# With the if guard
$ python bench.py 
t3 4.45s
t2 4.66s
t4 4.69s
t1 4.85s
t1 1.22s
t2 1.28s
t3 1.47s
t4 0.83s
total: 4.80

@aabmass aabmass linked a pull request Apr 19, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants