Code Review Bench PR #19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup#11
Conversation
…r startup closes PrefectHQ#19937 When the observer starts, it receives events for all existing pods/jobs and checks for duplicates via API calls. This can overwhelm the server when there are many pods. This change adds a configurable semaphore (default: 5 concurrent calls) to rate-limit these startup checks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
🔍 CI failure analysis for 5297067: The ruff format pre-commit hook failed due to a code formatting issue in observer.py that requires a multi-line dict parameter.IssueThe Root CauseThe PR modifies json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),needs to be split across multiple lines: json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),DetailsThe pre-commit hook runs CI Log Evidence:
The formatting issue is in Code Review 🚫 Blocked 0 resolved / 3 findingsCritical bug: the API request is outside the semaphore context manager, so rate limiting has no effect. The entire test file was also deleted without replacement, leaving zero test coverage for this module. 🚨 Bug: API request is outside semaphore — rate limiting is ineffective📄 src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py:137 The This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server. Suggested fix
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
|
|
||
| # Use semaphore to limit concurrent API calls during startup to prevent | ||
| # overwhelming the API server when there are many existing pods/jobs | ||
| async with _startup_event_semaphore: |
There was a problem hiding this comment.
🚨 Bug: API request is outside semaphore — rate limiting is ineffective
The async with _startup_event_semaphore: block on line 137 only wraps the construction of the event_filter object (lines 138-151). The actual HTTP request at line 153 (response = await orchestration_client.request(...)) is outside the semaphore context, meaning the semaphore is released before the API call is made.
This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the response = await ... line needs to be moved inside the async with block.
Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server.
Was this helpful? React with 👍 / 👎
Suggested fix
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
)
- Apply suggested fix
|
|
||
| events_client: EventsClient | None = None | ||
| orchestration_client: PrefectClient | None = None | ||
| _startup_event_semaphore: asyncio.Semaphore | None = None |
There was a problem hiding this comment.
⚠️ Quality: Entire test file deleted with no replacement tests
The PR deletes test_observer.py (489 lines covering _replicate_pod_event, start_observer/stop_observer, logging configuration, evicted pods, duplicate event filtering, different phases, and deterministic event IDs) without adding any replacement tests.
This removes all test coverage for the observer module, including pre-existing functionality unrelated to the semaphore change. The new semaphore behavior also has no test coverage, making it easy for bugs like the indentation issue (the API call being outside the semaphore block) to go undetected.
At minimum, the existing tests should be preserved (updated to account for the new semaphore initialization requirement), and new tests should be added to verify:
- The semaphore is properly initialized during startup
- Concurrent API calls are actually rate-limited
- The
RuntimeErroris raised when the semaphore is not initialized
Was this helpful? React with 👍 / 👎
- Apply suggested fix
Code Review Bench PR PrefectHQ#19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup
Benchmark PR for Gitar evaluation
Original PR: agentic-review-benchmarks#11
Add semaphore to limit concurrent API calls during Kubernetes observer startup
Summary by Gitar
/events/filterAPI calls to 5 (configurable viaPREFECT_INTEGRATIONS_KUBERNETES_OBSERVER_STARTUP_EVENT_CONCURRENCY)_startup_event_semaphoreinitialized ininitialize_clients()kopf startup handler and wraps duplicate event check logic in_replicate_pod_event()to enforce the concurrency limitstartup_event_concurrencysetting toKubernetesObserverSettingswith default value of 5, allowing users to tune concurrency for different cluster sizestest_observer.py(489 lines) without replacement, eliminating test coverage for all observer functionality