Skip to content

fix: add concurrency gate#1071

Merged
Jay-tech456 merged 1 commit intomainfrom
fix/apply-timeout-429
Feb 8, 2026
Merged

fix: add concurrency gate#1071
Jay-tech456 merged 1 commit intomainfrom
fix/apply-timeout-429

Conversation

@Jay-tech456
Copy link
Contributor

@Jay-tech456 Jay-tech456 commented Feb 8, 2026

Add concurrency gate and cache stampede protection to prevent 429/499 cascades

Adds a global ConcurrencyMiddleware (asyncio.Semaphore) that limits simultaneous request processing to prevent resource exhaustion under bot/attack traffic. Excess requests queue briefly, then get 503 instead of overwhelming the server.

Also fixes the cache invalidation thundering herd: model sync was invalidating the full catalog cache 35+ times (once per provider). Now invalidates each provider individually during the loop and does full/unique/stats invalidation only once at the end. Cache rebuild paths use threading. Lock to ensure only one thread rebuilds on cache miss while others wait for the result.

Summary by CodeRabbit

  • New Features
    • Added server-side concurrency controls with configurable request limits, queue sizes, and timeouts via environment variables.
    • Server now rejects excess requests with appropriate 503 responses when concurrency limits are reached.
    • Implemented cache stampede protection for model catalog operations, improving performance during heavy load scenarios.
    • Optimized cache invalidation during bulk model synchronization to reduce unnecessary cascading updates.

Greptile Overview

Greptile Summary

Adds global concurrency limiting via ConcurrencyMiddleware (asyncio.Semaphore) to prevent resource exhaustion under high traffic, and implements cache stampede protection using threading locks to prevent redundant database queries after cache invalidation.

Key Changes:

  • Concurrency Gate: New middleware limits simultaneous request processing (default: 20 active, 50 queued, 10s timeout). Returns 503 on overload instead of 429 since this is capacity protection, not rate limiting.
  • Cache Stampede Fix: Model catalog cache rebuild now uses threading locks with double-checked locking pattern, ensuring only one thread rebuilds on cache miss while others wait.
  • Batch Invalidation Optimization: sync_all_providers now invalidates global caches (full_catalog, unique_models, stats) once at the end instead of 35+ times per provider, preventing cache invalidation thundering herd.

Critical Issues Found:

  • ConcurrencyMiddleware has race conditions in the _waiting counter (lines 71, 112-120, 122-154) - multiple coroutines can increment it simultaneously, bypassing the queue size limit
  • Direct access to semaphore._value (private attribute) is brittle and breaks encapsulation (lines 95-109)
  • Exception handling between metrics decrement and semaphore release could cause state drift (lines 150-154)

The cache stampede protection changes look solid, but the concurrency middleware needs thread-safety fixes before production use.

Confidence Score: 2/5

  • This PR has critical concurrency bugs in the new middleware that could cause overload protection to fail under heavy traffic
  • Score reflects multiple critical race conditions in ConcurrencyMiddleware: the _waiting counter lacks synchronization (allowing queue overflow), unsafe access to private semaphore._value attribute, and potential state drift from exception handling. While the cache stampede protection is well-implemented, the concurrency gate could fail in production and actually make cascading failures worse by allowing more requests through than intended.
  • Pay close attention to src/middleware/concurrency_middleware.py - all race conditions must be fixed before merging

Important Files Changed

Filename Overview
src/middleware/concurrency_middleware.py New concurrency middleware with critical race conditions in _waiting counter and unsafe semaphore._value access. Needs thread-safety fixes and proper exception handling.
src/config/config.py Adds three new configuration parameters for concurrency control with reasonable defaults (limit=20, queue=50, timeout=10.0s).
src/main.py Registers the new concurrency middleware with config-based parameters. Integration looks correct.
src/services/model_catalog_cache.py Adds threading locks for cache stampede protection. Implements double-checked locking pattern correctly for get_cached_full_catalog, get_cached_unique_models, and provider/gateway catalog functions.
src/services/model_catalog_sync.py Introduces batch_mode parameter to prevent redundant cache invalidations. sync_all_providers now invalidates global caches once at the end instead of per-provider, eliminating thundering herd.

Sequence Diagram

sequenceDiagram
    participant Client
    participant ConcurrencyMW as Concurrency Middleware
    participant Semaphore
    participant App as FastAPI App
    participant Cache as Model Catalog Cache
    participant Lock as Threading Lock
    participant DB as Database

    Note over Client,DB: Request Flow with Concurrency Gate

    Client->>ConcurrencyMW: HTTP Request
    
    alt Exempt Path (/health, /metrics)
        ConcurrencyMW->>App: Pass through immediately
        App-->>Client: Response
    else Non-Exempt Path
        ConcurrencyMW->>Semaphore: Check availability
        
        alt Slot Available (Fast Path)
            Semaphore-->>ConcurrencyMW: Acquire (non-blocking)
            Note over ConcurrencyMW: Increment active counter
            ConcurrencyMW->>App: Process request
            App-->>ConcurrencyMW: Response
            Note over ConcurrencyMW: Decrement active counter
            ConcurrencyMW->>Semaphore: Release
            ConcurrencyMW-->>Client: Response
            
        else Slots Full
            alt Queue Not Full
                Note over ConcurrencyMW: Increment waiting counter
                ConcurrencyMW->>Semaphore: Acquire (with timeout)
                
                alt Acquired Before Timeout
                    Semaphore-->>ConcurrencyMW: Acquired
                    Note over ConcurrencyMW: Decrement waiting, increment active
                    ConcurrencyMW->>App: Process request
                    App-->>ConcurrencyMW: Response
                    Note over ConcurrencyMW: Decrement active counter
                    ConcurrencyMW->>Semaphore: Release
                    ConcurrencyMW-->>Client: Response
                else Timeout
                    Semaphore-->>ConcurrencyMW: TimeoutError
                    Note over ConcurrencyMW: Decrement waiting counter
                    ConcurrencyMW-->>Client: 503 Service Unavailable
                end
                
            else Queue Full
                ConcurrencyMW-->>Client: 503 Service Unavailable (immediate)
            end
        end
    end

    Note over Client,DB: Cache Stampede Protection Flow

    Client->>App: GET /catalog/models
    App->>Cache: get_cached_full_catalog()
    Cache->>Cache: Check Redis (MISS)
    Cache->>Cache: Check Local Memory (MISS)
    
    Cache->>Lock: Acquire rebuild lock
    Lock-->>Cache: Lock acquired
    
    Cache->>Cache: Double-check Redis
    alt Cache Rebuilt by Another Thread
        Cache-->>App: Return cached data
    else Still Missing
        Cache->>DB: Fetch all models
        DB-->>Cache: Model data
        Cache->>Cache: Transform to API format
        Cache->>Cache: Populate Redis + Local
        Cache-->>App: Return fresh data
    end
    
    Cache->>Lock: Release lock
    App-->>Client: Model catalog response

    Note over Client,DB: Model Sync with Batch Cache Invalidation

    participant Sync as Model Sync Service
    
    Sync->>Sync: sync_all_providers(batch_mode=True)
    
    loop For Each Provider
        Sync->>Sync: sync_provider_models(batch_mode=True)
        Sync->>DB: Upsert provider models
        DB-->>Sync: Success
        Sync->>Cache: Invalidate provider-specific cache only
        Note over Sync,Cache: Skip full/unique/stats invalidation
    end
    
    Note over Sync,Cache: After ALL providers synced
    Sync->>Cache: invalidate_full_catalog()
    Sync->>Cache: invalidate_unique_models()
    Sync->>Cache: invalidate_catalog_stats()
    Note over Sync,Cache: Global invalidation happens ONCE (not 35+ times)
Loading

…9/499 cascades

Adds a global ConcurrencyMiddleware (asyncio.Semaphore) that limits simultaneous
request processing to prevent resource exhaustion under bot/attack traffic.
Excess requests queue briefly then get 503 instead of overwhelming the server.

Also fixes the cache invalidation thundering herd: model sync was invalidating
the full catalog cache 35+ times (once per provider). Now invalidates each
provider individually during the loop and does full/unique/stats invalidation
only once at the end. Cache rebuild paths use threading.Lock to ensure only one
thread rebuilds on cache miss while others wait for the result.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@chatgpt-codex-connector
Copy link

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.
Credits must be used to enable repository wide code reviews.

@supabase
Copy link

supabase bot commented Feb 8, 2026

This pull request has been ignored for the connected project ynleroehyrmaafkgjgmr because there are no changes detected in supabase/supabase directory. You can change this behaviour in Project Integrations Settings ↗︎.


Preview Branches by Supabase.
Learn more about Supabase Branching ↗︎.

@Jay-tech456 Jay-tech456 merged commit 36cda2c into main Feb 8, 2026
6 of 7 checks passed
@coderabbitai
Copy link

coderabbitai bot commented Feb 8, 2026

Caution

Review failed

The pull request is closed.

📝 Walkthrough

Walkthrough

The PR introduces server-level concurrency controls via new middleware and configuration variables, implements cache rebuild stampede protection using threading locks, and optimizes cache invalidation through a batch mode parameter for provider syncing operations.

Changes

Cohort / File(s) Summary
Concurrency Control Configuration
src/config/config.py, src/main.py
Added three environment-configurable settings (CONCURRENCY_LIMIT, CONCURRENCY_QUEUE_SIZE, CONCURRENCY_QUEUE_TIMEOUT) to Config class; wired ConcurrencyMiddleware into FastAPI app with these configuration parameters.
Concurrency Middleware
src/middleware/concurrency_middleware.py
New ASGI middleware enforcing global server-wide concurrency gates using asyncio.Semaphore. Includes queueing logic with timeout-based rejection (503 responses), exempt paths for monitoring endpoints, and Prometheus metrics tracking active/queued/rejected requests.
Cache Stampede Protection
src/services/model_catalog_cache.py
Introduced thread-level locks (_rebuild_lock_full_catalog, _rebuild_lock_unique_models, _rebuild_locks_provider) with double-check pattern to prevent concurrent cache rebuilds. Protects full catalog, provider/gateway catalogs, and unique models paths.
Cache Invalidation Optimization
src/services/model_catalog_sync.py
Added batch_mode parameter to sync_provider_models() to control cache invalidation scope. In batch mode, only provider-specific Redis cache is invalidated; when disabled, full cascade (provider, unique_models, catalog_stats, full_catalog) occurs. Added global post-sync invalidation after all providers sync.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Middleware as ConcurrencyMiddleware
    participant Semaphore as asyncio.Semaphore
    participant App as FastAPI App
    participant Metrics as Prometheus Metrics

    Client->>Middleware: HTTP Request
    Middleware->>Semaphore: try_acquire (non-blocking)
    
    alt Fast Path (Semaphore Available)
        Semaphore-->>Middleware: ✓ Acquired
        Middleware->>Metrics: concurrency_active_requests++
        Middleware->>App: Call App
        App-->>Middleware: Response
        Middleware->>Metrics: concurrency_active_requests--
        Middleware->>Semaphore: release()
        Middleware-->>Client: Response
    else Queued Path (Room in Queue)
        Semaphore-->>Middleware: ✗ Not Available
        Middleware->>Metrics: concurrency_queued_requests++
        Middleware->>Semaphore: acquire (wait with timeout)
        
        alt Timeout Expires
            Semaphore-->>Middleware: Timeout
            Middleware->>Metrics: concurrency_rejected_total++
            Middleware-->>Client: 503 Service Unavailable (JSON)
        else Semaphore Acquired
            Semaphore-->>Middleware: ✓ Acquired
            Middleware->>Metrics: concurrency_queued_requests--
            Middleware->>Metrics: concurrency_active_requests++
            Middleware->>App: Call App
            App-->>Middleware: Response
            Middleware->>Metrics: concurrency_active_requests--
            Middleware->>Semaphore: release()
            Middleware-->>Client: Response
        end
    else Queue Full
        Semaphore-->>Middleware: Queue Full
        Middleware->>Metrics: concurrency_rejected_total++
        Middleware-->>Client: 503 Service Unavailable (JSON)
    end
Loading
sequenceDiagram
    participant Thread1 as Thread 1<br/>(Cache Miss)
    participant LockMgr as Lock Manager
    participant Thread2 as Thread 2<br/>(Also Missed)
    participant Cache as Redis/Local Cache
    participant DB as Database

    Thread1->>LockMgr: Acquire rebuild lock
    LockMgr-->>Thread1: Lock acquired
    Thread1->>Cache: Double-check cache
    
    alt Cache Hit (another thread rebuilt)
        Cache-->>Thread1: Cache value exists
        Thread1->>LockMgr: Release lock
        Thread1-->>Thread1: Return cached value
    else Cache Miss (rebuild needed)
        Thread1->>DB: Fetch models
        DB-->>Thread1: Models data
        Thread1->>Cache: Store in Redis
        Thread1->>Cache: Store in local cache
        Thread1->>LockMgr: Release lock
        Thread1-->>Thread1: Return models
        
        par Concurrent Request
            Thread2->>LockMgr: Acquire rebuild lock (blocked)
            Thread1->>Thread1: Done
            LockMgr-->>Thread2: Lock acquired
            Thread2->>Cache: Double-check cache
            Cache-->>Thread2: Cache hit (built by Thread1)
            Thread2->>LockMgr: Release lock
            Thread2-->>Thread2: Return cached value
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 A rabbit hops through queues with care,
Semaphores guard the path so fair,
Locks prevent the stampede's roar,
As caches keep from burning more!
Concurrency gates now stand so tall,
This bouncy code protects us all! 🎯

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/apply-timeout-429

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +113 to +123
method = scope.get("method", "UNKNOWN")
concurrency_rejected.labels(reason="queue_full").inc()
logger.warning(
f"Concurrency gate REJECT (queue full): {method} {path} "
f"(active={self.limit - self.semaphore._value}, queued={self._waiting})"
)
await self._send_503(scope, send, "Server at capacity, please retry")
return

# Queue the request with timeout
self._waiting += 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A race condition on the _waiting counter allows the request queue to grow beyond its configured queue_size limit because the check and increment operations are not atomic.
Severity: HIGH

Suggested Fix

Wrap the check of self._waiting and its subsequent increment in an async with self.lock: block, using an asyncio.Lock to ensure the operation is atomic and prevent multiple coroutines from concurrently passing the check.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/middleware/concurrency_middleware.py#L112-L123

Potential issue: A race condition exists in the request queuing logic. Multiple
concurrent requests can check the `self._waiting` counter (line 112), find that the
queue is not full, and then proceed to increment the counter (line 123). Because there
is no synchronization mechanism like an `asyncio.Lock` to make the check-then-increment
operation atomic, and the first `await` point occurs after the increment, the
`queue_size` limit can be exceeded under high load. This undermines the middleware's
ability to protect the server from resource exhaustion by failing to properly limit the
number of queued requests.

Did we get this right? 👍 / 👎 to inform future reviews.

Comment on lines +101 to +109
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
return
except Exception:
# If the fast-path manipulation failed, fall through to normal path
self.semaphore._value += 1
concurrency_active.dec()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The except block in the fast-path logic is missing a return statement, causing execution to fall through and potentially send a second response, violating the ASGI protocol.
Severity: HIGH

Suggested Fix

Add a return statement at the end of the except Exception block (after line 109) to ensure that code execution stops after handling the exception and does not fall through to the request queuing logic.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: src/middleware/concurrency_middleware.py#L101-L109

Potential issue: When an exception occurs within `await self.app(...)` in the fast-path
logic (line 101), particularly after a streaming response has already started, the outer
`except Exception` block (line 106) is executed. This block correctly reverts changes to
the semaphore but lacks a `return` statement. As a result, code execution falls through
to the subsequent queuing logic (line 112 onwards). This can lead to an attempt to send
a new 503 error response when a response has already been initiated, causing an ASGI
protocol violation and a server error.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

self.limit = limit
self.queue_size = queue_size
self.queue_timeout = queue_timeout
self._waiting = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._waiting lacks thread-safety, creating race conditions in concurrent scenarios

Suggested change
self._waiting = 0
self._waiting_lock = asyncio.Lock()
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 71:71

Comment:
`self._waiting` lacks thread-safety, creating race conditions in concurrent scenarios

```suggestion
        self._waiting_lock = asyncio.Lock()
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +95 to +109
acquired = self.semaphore._value > 0
if acquired:
try:
self.semaphore._value -= 1
concurrency_active.inc()
try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
return
except Exception:
# If the fast-path manipulation failed, fall through to normal path
self.semaphore._value += 1
concurrency_active.dec()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly accessing semaphore._value (private attribute) is brittle and breaks encapsulation. If the internal implementation changes, this will break. Consider using locked() instead or simply remove this optimization path.

Suggested change
acquired = self.semaphore._value > 0
if acquired:
try:
self.semaphore._value -= 1
concurrency_active.inc()
try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
return
except Exception:
# If the fast-path manipulation failed, fall through to normal path
self.semaphore._value += 1
concurrency_active.dec()
# Try to acquire the semaphore without blocking
try:
acquired = self.semaphore.acquire_nowait()
except ValueError:
# Semaphore at capacity, proceed to queue
acquired = False
if acquired:
concurrency_active.inc()
try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
return
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 95:109

Comment:
Directly accessing `semaphore._value` (private attribute) is brittle and breaks encapsulation. If the internal implementation changes, this will break. Consider using `locked()` instead or simply remove this optimization path.

```suggestion
        # Try to acquire the semaphore without blocking
        try:
            acquired = self.semaphore.acquire_nowait()
        except ValueError:
            # Semaphore at capacity, proceed to queue
            acquired = False
            
        if acquired:
            concurrency_active.inc()
            try:
                await self.app(scope, receive, send)
            finally:
                self.semaphore.release()
                concurrency_active.dec()
            return
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +112 to +120
if self._waiting >= self.queue_size:
method = scope.get("method", "UNKNOWN")
concurrency_rejected.labels(reason="queue_full").inc()
logger.warning(
f"Concurrency gate REJECT (queue full): {method} {path} "
f"(active={self.limit - self.semaphore._value}, queued={self._waiting})"
)
await self._send_503(scope, send, "Server at capacity, please retry")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition: self._waiting increment happens at line 123, but this check at line 112 reads it without synchronization. Multiple coroutines could all see _waiting < queue_size simultaneously and all increment it, exceeding the limit.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 112:120

Comment:
Race condition: `self._waiting` increment happens at line 123, but this check at line 112 reads it without synchronization. Multiple coroutines could all see `_waiting < queue_size` simultaneously and all increment it, exceeding the limit.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +122 to +154
# Queue the request with timeout
self._waiting += 1
concurrency_queued.inc()
wait_start = time.monotonic()

try:
await asyncio.wait_for(
self.semaphore.acquire(),
timeout=self.queue_timeout,
)
except asyncio.TimeoutError:
self._waiting -= 1
concurrency_queued.dec()
method = scope.get("method", "UNKNOWN")
wait_time = time.monotonic() - wait_start
concurrency_rejected.labels(reason="queue_timeout").inc()
logger.warning(
f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): "
f"{method} {path}"
)
await self._send_503(scope, send, "Server busy, please retry")
return

# Acquired after waiting — process the request
self._waiting -= 1
concurrency_queued.dec()
concurrency_active.inc()

try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If exception occurs between line 148 (decrement active) and line 153 (release semaphore), the semaphore won't be released but metrics will be decremented, causing state drift.

Suggested change
# Queue the request with timeout
self._waiting += 1
concurrency_queued.inc()
wait_start = time.monotonic()
try:
await asyncio.wait_for(
self.semaphore.acquire(),
timeout=self.queue_timeout,
)
except asyncio.TimeoutError:
self._waiting -= 1
concurrency_queued.dec()
method = scope.get("method", "UNKNOWN")
wait_time = time.monotonic() - wait_start
concurrency_rejected.labels(reason="queue_timeout").inc()
logger.warning(
f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): "
f"{method} {path}"
)
await self._send_503(scope, send, "Server busy, please retry")
return
# Acquired after waiting — process the request
self._waiting -= 1
concurrency_queued.dec()
concurrency_active.inc()
try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
# Queue the request with timeout
self._waiting += 1
concurrency_queued.inc()
wait_start = time.monotonic()
try:
await asyncio.wait_for(
self.semaphore.acquire(),
timeout=self.queue_timeout,
)
except asyncio.TimeoutError:
self._waiting -= 1
concurrency_queued.dec()
method = scope.get("method", "UNKNOWN")
wait_time = time.monotonic() - wait_start
concurrency_rejected.labels(reason="queue_timeout").inc()
logger.warning(
f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): "
f"{method} {path}"
)
await self._send_503(scope, send, "Server busy, please retry")
return
# Acquired after waiting — process the request
self._waiting -= 1
concurrency_queued.dec()
concurrency_active.inc()
try:
await self.app(scope, receive, send)
except Exception:
raise
finally:
concurrency_active.dec()
self.semaphore.release()
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 122:154

Comment:
If exception occurs between line 148 (decrement active) and line 153 (release semaphore), the semaphore won't be released but metrics will be decremented, causing state drift.

```suggestion
        # Queue the request with timeout
        self._waiting += 1
        concurrency_queued.inc()
        wait_start = time.monotonic()

        try:
            await asyncio.wait_for(
                self.semaphore.acquire(),
                timeout=self.queue_timeout,
            )
        except asyncio.TimeoutError:
            self._waiting -= 1
            concurrency_queued.dec()
            method = scope.get("method", "UNKNOWN")
            wait_time = time.monotonic() - wait_start
            concurrency_rejected.labels(reason="queue_timeout").inc()
            logger.warning(
                f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): "
                f"{method} {path}"
            )
            await self._send_503(scope, send, "Server busy, please retry")
            return

        # Acquired after waiting — process the request
        self._waiting -= 1
        concurrency_queued.dec()
        concurrency_active.inc()

        try:
            await self.app(scope, receive, send)
        except Exception:
            raise
        finally:
            concurrency_active.dec()
            self.semaphore.release()
```

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant