Description
Describe your environment
OS: SLES15SP5 (Docker)
Python version: 3.10.15
SDK version: 1.28.2
API version: 1.28.2
opentelemetry-exporter-otlp-proto-common 1.28.2
opentelemetry-exporter-otlp-proto-grpc 1.28.2
opentelemetry-instrumentation 0.49b2
opentelemetry-instrumentation-dbapi 0.49b2
opentelemetry-instrumentation-elasticsearch 0.49b2
opentelemetry-instrumentation-flask 0.49b2
opentelemetry-instrumentation-kafka-python 0.49b2
opentelemetry-instrumentation-logging 0.49b2
opentelemetry-instrumentation-mysql 0.49b2
opentelemetry-instrumentation-pymysql 0.49b2
opentelemetry-instrumentation-wsgi 0.49b2
opentelemetry-proto 1.28.2
opentelemetry-semantic-conventions 0.49b2
opentelemetry-util-http 0.49b2
gunicorn 22.0.0
flask 3.1.0
gevent 24.11.1
What happened?
We started seeing request timeouts and a memory leak a few months ago, but did not discover the root cause until recently. After enabling the --max-requests
flag in Gunicorn, we found that requests were getting stuck while trying to record metrics. It appears that there is a deadlock within OpenTelemetry when running a Flask/Gunicorn REST API with gevent workers.
I also suspect this is causing a memory leak, where stuck gevent threads slowly pile up trying to access the metric_reader. This does not happen when testing against flask directly.
This gevent exception is thrown when the stuck threads are killed after Gunicorn restarts the worker.
Unhandled error
Traceback (most recent call last):
...
File "/hostname/projects/...", line 103, in record_metric
counter.add(1, {
File "conda_env/lib/python3.10/site-packages/opentelemetry/metrics/_internal/instrument.py", line 206, in add
self._real_instrument.add(amount, attributes)
File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/instrument.py", line 163, in add
self._measurement_consumer.consume_measurement(
File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/measurement_consumer.py", line 82, in consume_measurement
reader_storage.consume_measurement(measurement)
File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py", line 117, in consume_measurement
for view_instrument_match in self._get_or_init_view_instrument_match(
File "conda_env/lib/python3.10/site-packages/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py", line 87, in _get_or_init_view_instrument_match
with self._lock:
File "conda_env/lib/python3.10/threading.py", line 168, in acquire
rc = self._block.acquire(blocking, timeout)
File "conda_env/lib/python3.10/site-packages/gevent/thread.py", line 121, in acquire
acquired = BoundedSemaphore.acquire(self, blocking, timeout)
File "src/gevent/_semaphore.py", line 180, in gevent._gevent_c_semaphore.Semaphore.acquire
File "src/gevent/_semaphore.py", line 249, in gevent._gevent_c_semaphore.Semaphore.acquire
File "src/gevent/_abstract_linkable.py", line 521, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait
File "src/gevent/_abstract_linkable.py", line 487, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 490, in gevent._gevent_c_abstract_linkable.AbstractLinkable._wait_core
File "src/gevent/_abstract_linkable.py", line 442, in gevent._gevent_c_abstract_linkable.AbstractLinkable._AbstractLinkable__wait_to_be_notified
File "src/gevent/_abstract_linkable.py", line 451, in gevent._gevent_c_abstract_linkable.AbstractLinkable._switch_to_hub
File "src/gevent/_greenlet_primitives.py", line 61, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_greenlet_primitives.py", line 65, in gevent._gevent_c_greenlet_primitives.SwitchOutGreenletWithLoop.switch
File "src/gevent/_gevent_c_greenlet_primitives.pxd", line 35, in gevent._gevent_c_greenlet_primitives._greenlet_switch
greenlet.GreenletExit
We tested commenting out the calls to increment metrics and requests stopped timing out
Steps to Reproduce
Unfortunately it's not easy to reproduce.
Create a simple flask/gunicorn server and run with gevent workers, then send requests until deadlock happens
entrypoint.py
from flask import Flask
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import Span
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics.export import (
PeriodicExportingMetricReader,
ConsoleMetricExporter
)
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
import logging
import logging.config
import os
from route import bp
logger = logging.getLogger(__name__)
flask_instrumentor: FlaskInstrumentor = FlaskInstrumentor()
def configure_app(initialize_otel: bool = True):
# setup open telemetry when running directly (gunicorn is handled in post_fork)
if initialize_otel:
OTEL_ENDPOINT = 'FILL IN URL'
resource = Resource(
{
ResourceAttributes.SERVICE_NAME: 'backend',
ResourceAttributes.SERVICE_VERSION: '1.0',
}
)
# setup open telemetry tracer provider
tracer_provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(OTEL_ENDPOINT, insecure=True)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(tracer_provider)
metric_exporter = OTLPMetricExporter(OTEL_ENDPOINT, insecure=True)
console_metric_exporter = ConsoleMetricExporter()
meter_provider = MeterProvider(
resource=resource,
metric_readers=[
PeriodicExportingMetricReader(console_metric_exporter),
# Enable this if you want
# PeriodicExportingMetricReader(metric_exporter),
]
)
metrics.set_meter_provider(meter_provider)
app = Flask(__name__)
app.register_blueprint(bp)
flask_instrumentor.instrument_app(app)
return app
def create_gunicorn(*_args, **kwargs):
app = configure_app(False)
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger.handlers.extend(gunicorn_logger.handlers)
return app
def create_flask(*_args, **kwargs):
app = configure_app()
return app
if __name__ == '__main__':
application = configure_app()
application.run(host='0.0.0.0', port=8000, debug=True)
route.py
from flask import Blueprint
from opentelemetry import trace, metrics
tracer = trace.get_tracer(__name__)
meter = metrics.get_meter(__name__)
call_counter = meter.create_counter('call_counter')
call_counter_2 = meter.create_counter('call_counter_2')
bp = Blueprint('test', __name__, url_prefix='/test')
@bp.route('/', methods=['GET'])
def test():
call_counter.add(1, {
'custom_attribute': 'foo'
})
return 'Ok'
@bp.route('/2', methods=['GET'])
def test2():
call_counter_2.add(1, {
'custom_attribute': 'foo'
})
return 'Ok'
gunicorn.conf.py
######################################################
# THIS MUST COME BEFORE ANY MODULE IMPORTS
# Otherwise otel will block if the collector is down
######################################################
# prevents warning when preloading https://github.com/benoitc/gunicorn/issues/2796
try:
import gevent.monkey
gevent.monkey.patch_all()
except ImportError:
pass
from gunicorn.arbiter import Arbiter
from gunicorn.workers.ggevent import GeventWorker
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SpanExporter, ConsoleSpanExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource, Attributes
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.sdk.metrics.export import MetricExporter, PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
# required to share config across workers
preload_app = True # pylint: disable=C0103
# workaround for BatchSpanProcessor not being fork-safe
# https://opentelemetry-python.readthedocs.io/en/latest/examples/fork-process-model/README.html
def post_fork(server: Arbiter, worker: GeventWorker):
server.log.info('Worker spawned (pid: %s, worker_id: %s)', worker.pid, worker._worker_id) # type: ignore
# If workers are not distinguished within attributes, traces and
# metrics exported from each worker will be indistinguishable. While
# not necessarily an issue for traces, it is confusing for almost
# all metric types. A built-in way to identify a worker is by PID
# but this may lead to high label cardinality. This is caused by
# each worker generating it's own metrics, so if they are not
# distinguished from eachother, the metrics appear to go up and down
# https://github.com/open-telemetry/opentelemetry-python/issues/3001
OTEL_ENDPOINT = '...'
resource = Resource({
ResourceAttributes.SERVICE_NAME: 'backend',
ResourceAttributes.SERVICE_NAMESPACE: '...',
ResourceAttributes.SERVICE_VERSION: '1.0',
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: '...',
'gunicorn.worker_id': worker._worker_id
})
# setup open telemetry tracer provider
tracer_provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(OTEL_ENDPOINT, insecure=True)
tracer_provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(tracer_provider)
metric_exporter = OTLPMetricExporter(OTEL_ENDPOINT, insecure=True)
meter_provider = MeterProvider(resource=resource, metric_readers=[PeriodicExportingMetricReader(metric_exporter)])
metrics.set_meter_provider(meter_provider)
# Add worker IDs to worker objects. Much smaller metric cardinality than if using PIDs
# from: https://gist.github.com/hynek/ba655c8756924a5febc5285c712a7946
def on_starting(server: Arbiter):
"""Attach a set of IDs that can be temporarily re-used.
Used on reloads when each worker exists twice."""
server._worker_id_overload = set() # type: ignore
def nworkers_changed(server: Arbiter, new_value, _old_value):
"""Gets called on startup too.
Set the current number of workers. Required if we raise the worker count
temporarily using TTIN because server.cfg.workers won't be updated and if
one of those workers dies, we wouldn't know the ids go that far."""
server._worker_id_current_workers = new_value # type: ignore
def _next_worker_id(server: Arbiter):
"""If there are IDs open for re-use, take one. Else look for a free one."""
if server._worker_id_overload: # type: ignore
return server._worker_id_overload.pop() # type: ignore
in_use = {w._worker_id for w in (server.WORKERS.values()) if w.alive}
free = set(range(1, server._worker_id_current_workers + 1)) - in_use # type: ignore
return free.pop()
def on_reload(server: Arbiter):
"""Add a full set of ids into overload so it can be re-used once."""
server._worker_id_overload = set(range(1, server.cfg.workers + 1)) # type: ignore
def pre_fork(server: Arbiter, worker: GeventWorker):
"""Attach the next free worker_id before forking"""
worker._worker_id = _next_worker_id(server) # type: ignore
#!/bin/bash
# exit on any error
set -e
SCRIPT_DIR=`dirname $0`
if [ $SCRIPT_DIR == "." ]
then
SCRIPT_DIR=$PWD
fi
echo "Starting server"
PORT=8000
WORKERS=2
TIMEOUT=360
MAX_REQUEST=1000
MAX_REQ_JITTER=100
LOG_LEVEL="debug"
gunicorn "entrypoint:create_gunicorn()" \
--bind 0.0.0.0:$PORT \
--workers $WORKERS \
--timeout $TIMEOUT \
--log-level=$LOG_LEVEL \
--keep-alive=360 \
--backlog 8000 \
--max-requests=$MAX_REQUEST \
--max-requests-jitter=$MAX_REQ_JITTER \
--worker-class=gevent \
-c ./example_conf.py
send_requests.sh
#!/bin/bash
while true;
do
curl -L 127.0.0.1:8000/test --max-time 30 & curl -L 127.0.0.1:8000/test/2 --max-time 30
wait
done
Expected Result
All requests succeed and metrics are recorded
Actual Result
A small number of requests deadlock
Additional context
No response
Would you like to implement a fix?
No