Conversation
…9/499 cascades Adds a global ConcurrencyMiddleware (asyncio.Semaphore) that limits simultaneous request processing to prevent resource exhaustion under bot/attack traffic. Excess requests queue briefly then get 503 instead of overwhelming the server. Also fixes the cache invalidation thundering herd: model sync was invalidating the full catalog cache 35+ times (once per provider). Now invalidates each provider individually during the loop and does full/unique/stats invalidation only once at the end. Cache rebuild paths use threading.Lock to ensure only one thread rebuilds on cache miss while others wait for the result. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits. |
|
This pull request has been ignored for the connected project Preview Branches by Supabase. |
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughThe PR introduces server-level concurrency controls via new middleware and configuration variables, implements cache rebuild stampede protection using threading locks, and optimizes cache invalidation through a batch mode parameter for provider syncing operations. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Middleware as ConcurrencyMiddleware
participant Semaphore as asyncio.Semaphore
participant App as FastAPI App
participant Metrics as Prometheus Metrics
Client->>Middleware: HTTP Request
Middleware->>Semaphore: try_acquire (non-blocking)
alt Fast Path (Semaphore Available)
Semaphore-->>Middleware: ✓ Acquired
Middleware->>Metrics: concurrency_active_requests++
Middleware->>App: Call App
App-->>Middleware: Response
Middleware->>Metrics: concurrency_active_requests--
Middleware->>Semaphore: release()
Middleware-->>Client: Response
else Queued Path (Room in Queue)
Semaphore-->>Middleware: ✗ Not Available
Middleware->>Metrics: concurrency_queued_requests++
Middleware->>Semaphore: acquire (wait with timeout)
alt Timeout Expires
Semaphore-->>Middleware: Timeout
Middleware->>Metrics: concurrency_rejected_total++
Middleware-->>Client: 503 Service Unavailable (JSON)
else Semaphore Acquired
Semaphore-->>Middleware: ✓ Acquired
Middleware->>Metrics: concurrency_queued_requests--
Middleware->>Metrics: concurrency_active_requests++
Middleware->>App: Call App
App-->>Middleware: Response
Middleware->>Metrics: concurrency_active_requests--
Middleware->>Semaphore: release()
Middleware-->>Client: Response
end
else Queue Full
Semaphore-->>Middleware: Queue Full
Middleware->>Metrics: concurrency_rejected_total++
Middleware-->>Client: 503 Service Unavailable (JSON)
end
sequenceDiagram
participant Thread1 as Thread 1<br/>(Cache Miss)
participant LockMgr as Lock Manager
participant Thread2 as Thread 2<br/>(Also Missed)
participant Cache as Redis/Local Cache
participant DB as Database
Thread1->>LockMgr: Acquire rebuild lock
LockMgr-->>Thread1: Lock acquired
Thread1->>Cache: Double-check cache
alt Cache Hit (another thread rebuilt)
Cache-->>Thread1: Cache value exists
Thread1->>LockMgr: Release lock
Thread1-->>Thread1: Return cached value
else Cache Miss (rebuild needed)
Thread1->>DB: Fetch models
DB-->>Thread1: Models data
Thread1->>Cache: Store in Redis
Thread1->>Cache: Store in local cache
Thread1->>LockMgr: Release lock
Thread1-->>Thread1: Return models
par Concurrent Request
Thread2->>LockMgr: Acquire rebuild lock (blocked)
Thread1->>Thread1: Done
LockMgr-->>Thread2: Lock acquired
Thread2->>Cache: Double-check cache
Cache-->>Thread2: Cache hit (built by Thread1)
Thread2->>LockMgr: Release lock
Thread2-->>Thread2: Return cached value
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
| method = scope.get("method", "UNKNOWN") | ||
| concurrency_rejected.labels(reason="queue_full").inc() | ||
| logger.warning( | ||
| f"Concurrency gate REJECT (queue full): {method} {path} " | ||
| f"(active={self.limit - self.semaphore._value}, queued={self._waiting})" | ||
| ) | ||
| await self._send_503(scope, send, "Server at capacity, please retry") | ||
| return | ||
|
|
||
| # Queue the request with timeout | ||
| self._waiting += 1 |
There was a problem hiding this comment.
Bug: A race condition on the _waiting counter allows the request queue to grow beyond its configured queue_size limit because the check and increment operations are not atomic.
Severity: HIGH
Suggested Fix
Wrap the check of self._waiting and its subsequent increment in an async with self.lock: block, using an asyncio.Lock to ensure the operation is atomic and prevent multiple coroutines from concurrently passing the check.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/middleware/concurrency_middleware.py#L112-L123
Potential issue: A race condition exists in the request queuing logic. Multiple
concurrent requests can check the `self._waiting` counter (line 112), find that the
queue is not full, and then proceed to increment the counter (line 123). Because there
is no synchronization mechanism like an `asyncio.Lock` to make the check-then-increment
operation atomic, and the first `await` point occurs after the increment, the
`queue_size` limit can be exceeded under high load. This undermines the middleware's
ability to protect the server from resource exhaustion by failing to properly limit the
number of queued requests.
Did we get this right? 👍 / 👎 to inform future reviews.
| await self.app(scope, receive, send) | ||
| finally: | ||
| self.semaphore.release() | ||
| concurrency_active.dec() | ||
| return | ||
| except Exception: | ||
| # If the fast-path manipulation failed, fall through to normal path | ||
| self.semaphore._value += 1 | ||
| concurrency_active.dec() |
There was a problem hiding this comment.
Bug: The except block in the fast-path logic is missing a return statement, causing execution to fall through and potentially send a second response, violating the ASGI protocol.
Severity: HIGH
Suggested Fix
Add a return statement at the end of the except Exception block (after line 109) to ensure that code execution stops after handling the exception and does not fall through to the request queuing logic.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/middleware/concurrency_middleware.py#L101-L109
Potential issue: When an exception occurs within `await self.app(...)` in the fast-path
logic (line 101), particularly after a streaming response has already started, the outer
`except Exception` block (line 106) is executed. This block correctly reverts changes to
the semaphore but lacks a `return` statement. As a result, code execution falls through
to the subsequent queuing logic (line 112 onwards). This can lead to an attempt to send
a new 503 error response when a response has already been initiated, causing an ASGI
protocol violation and a server error.
Did we get this right? 👍 / 👎 to inform future reviews.
| self.limit = limit | ||
| self.queue_size = queue_size | ||
| self.queue_timeout = queue_timeout | ||
| self._waiting = 0 |
There was a problem hiding this comment.
self._waiting lacks thread-safety, creating race conditions in concurrent scenarios
| self._waiting = 0 | |
| self._waiting_lock = asyncio.Lock() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 71:71
Comment:
`self._waiting` lacks thread-safety, creating race conditions in concurrent scenarios
```suggestion
self._waiting_lock = asyncio.Lock()
```
How can I resolve this? If you propose a fix, please make it concise.| acquired = self.semaphore._value > 0 | ||
| if acquired: | ||
| try: | ||
| self.semaphore._value -= 1 | ||
| concurrency_active.inc() | ||
| try: | ||
| await self.app(scope, receive, send) | ||
| finally: | ||
| self.semaphore.release() | ||
| concurrency_active.dec() | ||
| return | ||
| except Exception: | ||
| # If the fast-path manipulation failed, fall through to normal path | ||
| self.semaphore._value += 1 | ||
| concurrency_active.dec() |
There was a problem hiding this comment.
Directly accessing semaphore._value (private attribute) is brittle and breaks encapsulation. If the internal implementation changes, this will break. Consider using locked() instead or simply remove this optimization path.
| acquired = self.semaphore._value > 0 | |
| if acquired: | |
| try: | |
| self.semaphore._value -= 1 | |
| concurrency_active.inc() | |
| try: | |
| await self.app(scope, receive, send) | |
| finally: | |
| self.semaphore.release() | |
| concurrency_active.dec() | |
| return | |
| except Exception: | |
| # If the fast-path manipulation failed, fall through to normal path | |
| self.semaphore._value += 1 | |
| concurrency_active.dec() | |
| # Try to acquire the semaphore without blocking | |
| try: | |
| acquired = self.semaphore.acquire_nowait() | |
| except ValueError: | |
| # Semaphore at capacity, proceed to queue | |
| acquired = False | |
| if acquired: | |
| concurrency_active.inc() | |
| try: | |
| await self.app(scope, receive, send) | |
| finally: | |
| self.semaphore.release() | |
| concurrency_active.dec() | |
| return |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 95:109
Comment:
Directly accessing `semaphore._value` (private attribute) is brittle and breaks encapsulation. If the internal implementation changes, this will break. Consider using `locked()` instead or simply remove this optimization path.
```suggestion
# Try to acquire the semaphore without blocking
try:
acquired = self.semaphore.acquire_nowait()
except ValueError:
# Semaphore at capacity, proceed to queue
acquired = False
if acquired:
concurrency_active.inc()
try:
await self.app(scope, receive, send)
finally:
self.semaphore.release()
concurrency_active.dec()
return
```
How can I resolve this? If you propose a fix, please make it concise.| if self._waiting >= self.queue_size: | ||
| method = scope.get("method", "UNKNOWN") | ||
| concurrency_rejected.labels(reason="queue_full").inc() | ||
| logger.warning( | ||
| f"Concurrency gate REJECT (queue full): {method} {path} " | ||
| f"(active={self.limit - self.semaphore._value}, queued={self._waiting})" | ||
| ) | ||
| await self._send_503(scope, send, "Server at capacity, please retry") | ||
| return |
There was a problem hiding this comment.
Race condition: self._waiting increment happens at line 123, but this check at line 112 reads it without synchronization. Multiple coroutines could all see _waiting < queue_size simultaneously and all increment it, exceeding the limit.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 112:120
Comment:
Race condition: `self._waiting` increment happens at line 123, but this check at line 112 reads it without synchronization. Multiple coroutines could all see `_waiting < queue_size` simultaneously and all increment it, exceeding the limit.
How can I resolve this? If you propose a fix, please make it concise.| # Queue the request with timeout | ||
| self._waiting += 1 | ||
| concurrency_queued.inc() | ||
| wait_start = time.monotonic() | ||
|
|
||
| try: | ||
| await asyncio.wait_for( | ||
| self.semaphore.acquire(), | ||
| timeout=self.queue_timeout, | ||
| ) | ||
| except asyncio.TimeoutError: | ||
| self._waiting -= 1 | ||
| concurrency_queued.dec() | ||
| method = scope.get("method", "UNKNOWN") | ||
| wait_time = time.monotonic() - wait_start | ||
| concurrency_rejected.labels(reason="queue_timeout").inc() | ||
| logger.warning( | ||
| f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): " | ||
| f"{method} {path}" | ||
| ) | ||
| await self._send_503(scope, send, "Server busy, please retry") | ||
| return | ||
|
|
||
| # Acquired after waiting — process the request | ||
| self._waiting -= 1 | ||
| concurrency_queued.dec() | ||
| concurrency_active.inc() | ||
|
|
||
| try: | ||
| await self.app(scope, receive, send) | ||
| finally: | ||
| self.semaphore.release() | ||
| concurrency_active.dec() |
There was a problem hiding this comment.
If exception occurs between line 148 (decrement active) and line 153 (release semaphore), the semaphore won't be released but metrics will be decremented, causing state drift.
| # Queue the request with timeout | |
| self._waiting += 1 | |
| concurrency_queued.inc() | |
| wait_start = time.monotonic() | |
| try: | |
| await asyncio.wait_for( | |
| self.semaphore.acquire(), | |
| timeout=self.queue_timeout, | |
| ) | |
| except asyncio.TimeoutError: | |
| self._waiting -= 1 | |
| concurrency_queued.dec() | |
| method = scope.get("method", "UNKNOWN") | |
| wait_time = time.monotonic() - wait_start | |
| concurrency_rejected.labels(reason="queue_timeout").inc() | |
| logger.warning( | |
| f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): " | |
| f"{method} {path}" | |
| ) | |
| await self._send_503(scope, send, "Server busy, please retry") | |
| return | |
| # Acquired after waiting — process the request | |
| self._waiting -= 1 | |
| concurrency_queued.dec() | |
| concurrency_active.inc() | |
| try: | |
| await self.app(scope, receive, send) | |
| finally: | |
| self.semaphore.release() | |
| concurrency_active.dec() | |
| # Queue the request with timeout | |
| self._waiting += 1 | |
| concurrency_queued.inc() | |
| wait_start = time.monotonic() | |
| try: | |
| await asyncio.wait_for( | |
| self.semaphore.acquire(), | |
| timeout=self.queue_timeout, | |
| ) | |
| except asyncio.TimeoutError: | |
| self._waiting -= 1 | |
| concurrency_queued.dec() | |
| method = scope.get("method", "UNKNOWN") | |
| wait_time = time.monotonic() - wait_start | |
| concurrency_rejected.labels(reason="queue_timeout").inc() | |
| logger.warning( | |
| f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): " | |
| f"{method} {path}" | |
| ) | |
| await self._send_503(scope, send, "Server busy, please retry") | |
| return | |
| # Acquired after waiting — process the request | |
| self._waiting -= 1 | |
| concurrency_queued.dec() | |
| concurrency_active.inc() | |
| try: | |
| await self.app(scope, receive, send) | |
| except Exception: | |
| raise | |
| finally: | |
| concurrency_active.dec() | |
| self.semaphore.release() |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/middleware/concurrency_middleware.py
Line: 122:154
Comment:
If exception occurs between line 148 (decrement active) and line 153 (release semaphore), the semaphore won't be released but metrics will be decremented, causing state drift.
```suggestion
# Queue the request with timeout
self._waiting += 1
concurrency_queued.inc()
wait_start = time.monotonic()
try:
await asyncio.wait_for(
self.semaphore.acquire(),
timeout=self.queue_timeout,
)
except asyncio.TimeoutError:
self._waiting -= 1
concurrency_queued.dec()
method = scope.get("method", "UNKNOWN")
wait_time = time.monotonic() - wait_start
concurrency_rejected.labels(reason="queue_timeout").inc()
logger.warning(
f"Concurrency gate REJECT (queue timeout {wait_time:.1f}s): "
f"{method} {path}"
)
await self._send_503(scope, send, "Server busy, please retry")
return
# Acquired after waiting — process the request
self._waiting -= 1
concurrency_queued.dec()
concurrency_active.inc()
try:
await self.app(scope, receive, send)
except Exception:
raise
finally:
concurrency_active.dec()
self.semaphore.release()
```
How can I resolve this? If you propose a fix, please make it concise.
Add concurrency gate and cache stampede protection to prevent 429/499 cascades
Adds a global ConcurrencyMiddleware (asyncio.Semaphore) that limits simultaneous request processing to prevent resource exhaustion under bot/attack traffic. Excess requests queue briefly, then get 503 instead of overwhelming the server.
Also fixes the cache invalidation thundering herd: model sync was invalidating the full catalog cache 35+ times (once per provider). Now invalidates each provider individually during the loop and does full/unique/stats invalidation only once at the end. Cache rebuild paths use threading. Lock to ensure only one thread rebuilds on cache miss while others wait for the result.
Summary by CodeRabbit
Greptile Overview
Greptile Summary
Adds global concurrency limiting via
ConcurrencyMiddleware(asyncio.Semaphore) to prevent resource exhaustion under high traffic, and implements cache stampede protection using threading locks to prevent redundant database queries after cache invalidation.Key Changes:
sync_all_providersnow invalidates global caches (full_catalog,unique_models,stats) once at the end instead of 35+ times per provider, preventing cache invalidation thundering herd.Critical Issues Found:
ConcurrencyMiddlewarehas race conditions in the_waitingcounter (lines 71, 112-120, 122-154) - multiple coroutines can increment it simultaneously, bypassing the queue size limitsemaphore._value(private attribute) is brittle and breaks encapsulation (lines 95-109)The cache stampede protection changes look solid, but the concurrency middleware needs thread-safety fixes before production use.
Confidence Score: 2/5
ConcurrencyMiddleware: the_waitingcounter lacks synchronization (allowing queue overflow), unsafe access to privatesemaphore._valueattribute, and potential state drift from exception handling. While the cache stampede protection is well-implemented, the concurrency gate could fail in production and actually make cascading failures worse by allowing more requests through than intended.src/middleware/concurrency_middleware.py- all race conditions must be fixed before mergingImportant Files Changed
_waitingcounter and unsafesemaphore._valueaccess. Needs thread-safety fixes and proper exception handling.get_cached_full_catalog,get_cached_unique_models, and provider/gateway catalog functions.batch_modeparameter to prevent redundant cache invalidations.sync_all_providersnow invalidates global caches once at the end instead of per-provider, eliminating thundering herd.Sequence Diagram
sequenceDiagram participant Client participant ConcurrencyMW as Concurrency Middleware participant Semaphore participant App as FastAPI App participant Cache as Model Catalog Cache participant Lock as Threading Lock participant DB as Database Note over Client,DB: Request Flow with Concurrency Gate Client->>ConcurrencyMW: HTTP Request alt Exempt Path (/health, /metrics) ConcurrencyMW->>App: Pass through immediately App-->>Client: Response else Non-Exempt Path ConcurrencyMW->>Semaphore: Check availability alt Slot Available (Fast Path) Semaphore-->>ConcurrencyMW: Acquire (non-blocking) Note over ConcurrencyMW: Increment active counter ConcurrencyMW->>App: Process request App-->>ConcurrencyMW: Response Note over ConcurrencyMW: Decrement active counter ConcurrencyMW->>Semaphore: Release ConcurrencyMW-->>Client: Response else Slots Full alt Queue Not Full Note over ConcurrencyMW: Increment waiting counter ConcurrencyMW->>Semaphore: Acquire (with timeout) alt Acquired Before Timeout Semaphore-->>ConcurrencyMW: Acquired Note over ConcurrencyMW: Decrement waiting, increment active ConcurrencyMW->>App: Process request App-->>ConcurrencyMW: Response Note over ConcurrencyMW: Decrement active counter ConcurrencyMW->>Semaphore: Release ConcurrencyMW-->>Client: Response else Timeout Semaphore-->>ConcurrencyMW: TimeoutError Note over ConcurrencyMW: Decrement waiting counter ConcurrencyMW-->>Client: 503 Service Unavailable end else Queue Full ConcurrencyMW-->>Client: 503 Service Unavailable (immediate) end end end Note over Client,DB: Cache Stampede Protection Flow Client->>App: GET /catalog/models App->>Cache: get_cached_full_catalog() Cache->>Cache: Check Redis (MISS) Cache->>Cache: Check Local Memory (MISS) Cache->>Lock: Acquire rebuild lock Lock-->>Cache: Lock acquired Cache->>Cache: Double-check Redis alt Cache Rebuilt by Another Thread Cache-->>App: Return cached data else Still Missing Cache->>DB: Fetch all models DB-->>Cache: Model data Cache->>Cache: Transform to API format Cache->>Cache: Populate Redis + Local Cache-->>App: Return fresh data end Cache->>Lock: Release lock App-->>Client: Model catalog response Note over Client,DB: Model Sync with Batch Cache Invalidation participant Sync as Model Sync Service Sync->>Sync: sync_all_providers(batch_mode=True) loop For Each Provider Sync->>Sync: sync_provider_models(batch_mode=True) Sync->>DB: Upsert provider models DB-->>Sync: Success Sync->>Cache: Invalidate provider-specific cache only Note over Sync,Cache: Skip full/unique/stats invalidation end Note over Sync,Cache: After ALL providers synced Sync->>Cache: invalidate_full_catalog() Sync->>Cache: invalidate_unique_models() Sync->>Cache: invalidate_catalog_stats() Note over Sync,Cache: Global invalidation happens ONCE (not 35+ times)