Add semaphore to limit concurrent API calls during Kubernetes observer startup#37
Conversation
…r startup closes PrefectHQ#19937 When the observer starts, it receives events for all existing pods/jobs and checks for duplicates via API calls. This can overwhelm the server when there are many pods. This change adds a configurable semaphore (default: 5 concurrent calls) to rate-limit these startup checks. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
WalkthroughIntroduces a global semaphore mechanism to throttle concurrent startup event processing in the Kubernetes observer, configurable via a new Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py`:
- Around line 129-158: The semaphore only guards EventFilter construction
because the orchestration_client.request call is outside the async with block;
move the POST call to orchestration_client.request(...) inside the async with
_startup_event_semaphore block so the semaphore actually throttles concurrent
API calls, keeping the EventFilter creation and the subsequent await
orchestration_client.request(...) within the same async with scope (references:
_startup_event_semaphore, EventFilter, orchestration_client.request).
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py`:
- Around line 59-64: The startup_event_concurrency Field currently allows 0 or
negative values which can cause asyncio.Semaphore deadlock or raise ValueError;
update the Field definition for startup_event_concurrency in settings.py to
enforce a minimum of 1 (e.g., add ge=1 to the Field constraints) so that any
invalid config is rejected before passing the value to asyncio.Semaphore.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.pysrc/integrations/prefect-kubernetes/prefect_kubernetes/settings.pysrc/integrations/prefect-kubernetes/tests/test_observer.py
💤 Files with no reviewable changes (1)
- src/integrations/prefect-kubernetes/tests/test_observer.py
🧰 Additional context used
🧬 Code graph analysis (1)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py (1)
src/prefect/events/clients.py (1)
get_events_client(89-117)
🪛 Ruff (0.14.14)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py
133-133: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Pre-commit checks
🔇 Additional comments (2)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py (2)
47-47: LGTM. The global semaphore placeholder is a clear, minimal addition.
60-65: LGTM. Semaphore initialization is straightforward and ties cleanly to settings.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| if event_type is None: | ||
| if orchestration_client is None: | ||
| raise RuntimeError("Orchestration client not initialized") | ||
|
|
||
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | ||
| event_filter = EventFilter( | ||
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | ||
| resource=EventResourceFilter( | ||
| id=[f"prefect.kubernetes.pod.{uid}"], | ||
| ), | ||
| occurred=EventOccurredFilter( | ||
| since=( | ||
| k8s_created_time | ||
| if k8s_created_time | ||
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | ||
| ) | ||
| ), | ||
| ) | ||
| if _startup_event_semaphore is None: | ||
| raise RuntimeError("Startup event semaphore not initialized") | ||
|
|
||
| # Use semaphore to limit concurrent API calls during startup to prevent | ||
| # overwhelming the API server when there are many existing pods/jobs | ||
| async with _startup_event_semaphore: | ||
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | ||
| event_filter = EventFilter( | ||
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | ||
| resource=EventResourceFilter( | ||
| id=[f"prefect.kubernetes.pod.{uid}"], | ||
| ), | ||
| occurred=EventOccurredFilter( | ||
| since=( | ||
| k8s_created_time | ||
| if k8s_created_time | ||
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | ||
| ) | ||
| ), | ||
| ) | ||
|
|
||
| response = await orchestration_client.request( | ||
| "POST", | ||
| "/events/filter", | ||
| json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")), | ||
| json=dict( | ||
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | ||
| ), |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Python asyncio semaphore async with block throttle concurrency
💡 Result:
To throttle (cap) concurrency in asyncio, wrap the work you want to limit in an asyncio.Semaphore(N) using async with. Entering the block acquires the semaphore (may wait if the counter is 0) and exiting the block releases it. (docs.python.org)
import asyncio
sem = asyncio.Semaphore(5) # allow at most 5 concurrent workers
async def worker(x):
async with sem: # acquire -> run -> release
await asyncio.sleep(1)
return x * 2
async def main():
results = await asyncio.gather(*(worker(i) for i in range(100)))
print(results[:10])
asyncio.run(main())Notes:
async with sem:is the intended usage pattern (equivalent toawait sem.acquire()/sem.release()in atry/finally). (docs.python.org)- If you write
async with sem as token:,tokenwill beNonebecause the semaphore context manager doesn’t return a useful value from__aenter__; you typically don’t useaswith semaphores. (stackoverflow.com) - Use
asyncio.BoundedSemaphore(N)if you want an error when something over-releases (helps catch bugs). (docs.python.org)
Citations:
- 1: https://docs.python.org/3.12/library/asyncio-sync.html
- 2: https://docs.python.org/3.12/library/asyncio-sync.html
- 3: https://stackoverflow.com/questions/75959529/asyncio-semaphore-cannot-be-used-with-the-async-with-as-syntax?utm_source=openai
- 4: https://docs.python.org/3.12/library/asyncio-sync.html
Semaphore doesn't throttle the API call.
The async with _startup_event_semaphore block wraps only the filter creation, but the POST request happens outside the block. Since code exiting the async with block releases the semaphore immediately, concurrent API calls are not actually limited. Move the orchestration_client.request() call inside the semaphore block to throttle it.
🛠️ Suggested fix
- async with _startup_event_semaphore:
- # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
- event_filter = EventFilter(
- event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
- resource=EventResourceFilter(
- id=[f"prefect.kubernetes.pod.{uid}"],
- ),
- occurred=EventOccurredFilter(
- since=(
- k8s_created_time
- if k8s_created_time
- else (datetime.now(timezone.utc) - timedelta(hours=1))
- )
- ),
- )
-
- response = await orchestration_client.request(
- "POST",
- "/events/filter",
- json=dict(
- filter=event_filter.model_dump(exclude_unset=True, mode="json")
- ),
- )
+ async with _startup_event_semaphore:
+ # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
+ event_filter = EventFilter(
+ event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
+ resource=EventResourceFilter(
+ id=[f"prefect.kubernetes.pod.{uid}"],
+ ),
+ occurred=EventOccurredFilter(
+ since=(
+ k8s_created_time
+ if k8s_created_time
+ else (datetime.now(timezone.utc) - timedelta(hours=1))
+ )
+ ),
+ )
+
+ response = await orchestration_client.request(
+ "POST",
+ "/events/filter",
+ json=dict(
+ filter=event_filter.model_dump(exclude_unset=True, mode="json")
+ ),
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if event_type is None: | |
| if orchestration_client is None: | |
| raise RuntimeError("Orchestration client not initialized") | |
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | |
| event_filter = EventFilter( | |
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | |
| resource=EventResourceFilter( | |
| id=[f"prefect.kubernetes.pod.{uid}"], | |
| ), | |
| occurred=EventOccurredFilter( | |
| since=( | |
| k8s_created_time | |
| if k8s_created_time | |
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | |
| ) | |
| ), | |
| ) | |
| if _startup_event_semaphore is None: | |
| raise RuntimeError("Startup event semaphore not initialized") | |
| # Use semaphore to limit concurrent API calls during startup to prevent | |
| # overwhelming the API server when there are many existing pods/jobs | |
| async with _startup_event_semaphore: | |
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | |
| event_filter = EventFilter( | |
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | |
| resource=EventResourceFilter( | |
| id=[f"prefect.kubernetes.pod.{uid}"], | |
| ), | |
| occurred=EventOccurredFilter( | |
| since=( | |
| k8s_created_time | |
| if k8s_created_time | |
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | |
| ) | |
| ), | |
| ) | |
| response = await orchestration_client.request( | |
| "POST", | |
| "/events/filter", | |
| json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")), | |
| json=dict( | |
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | |
| ), | |
| if event_type is None: | |
| if orchestration_client is None: | |
| raise RuntimeError("Orchestration client not initialized") | |
| if _startup_event_semaphore is None: | |
| raise RuntimeError("Startup event semaphore not initialized") | |
| # Use semaphore to limit concurrent API calls during startup to prevent | |
| # overwhelming the API server when there are many existing pods/jobs | |
| async with _startup_event_semaphore: | |
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | |
| event_filter = EventFilter( | |
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | |
| resource=EventResourceFilter( | |
| id=[f"prefect.kubernetes.pod.{uid}"], | |
| ), | |
| occurred=EventOccurredFilter( | |
| since=( | |
| k8s_created_time | |
| if k8s_created_time | |
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | |
| ) | |
| ), | |
| ) | |
| response = await orchestration_client.request( | |
| "POST", | |
| "/events/filter", | |
| json=dict( | |
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | |
| ), | |
| ) |
🧰 Tools
🪛 Ruff (0.14.14)
131-131: Avoid specifying long messages outside the exception class
(TRY003)
133-133: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py` around
lines 129 - 158, The semaphore only guards EventFilter construction because the
orchestration_client.request call is outside the async with block; move the POST
call to orchestration_client.request(...) inside the async with
_startup_event_semaphore block so the semaphore actually throttles concurrent
API calls, keeping the EventFilter creation and the subsequent await
orchestration_client.request(...) within the same async with scope (references:
_startup_event_semaphore, EventFilter, orchestration_client.request).
| startup_event_concurrency: int = Field( | ||
| default=5, | ||
| description="Maximum number of concurrent API calls when checking for " | ||
| "duplicate events during observer startup. This helps prevent overloading " | ||
| "the API server when there are many existing pods/jobs in the cluster.", | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py | sed -n '1,100p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 4456
🏁 Script executed:
rg "startup_event_concurrency" src/integrations/prefect-kubernetes/ -B 2 -A 5Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1721
🏁 Script executed:
ast-grep --pattern 'class $_ {
$$$
startup_event_concurrency $$$
$$$
}'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 68
🏁 Script executed:
rg "startup_event_concurrency" --type py -B 3 -A 3Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 1596
🌐 Web query:
Pydantic v2 Field constraints ge greater than or equal minimum integer validation
💡 Result:
In Pydantic v2, to validate that an integer is greater than or equal to a minimum, use ge (“greater or equal”) on Field:
from pydantic import BaseModel, Field
class Model(BaseModel):
min_year: int = Field(ge=1950)
Model(min_year=1950) # ok
Model(min_year=1949) # ValidationErrorge=...enforcesvalue >= ...for numeric fields. In generated JSON Schema,gebecomesminimum. [1]Fieldofficially supports numeric constraintsgt/ge/lt/le/multiple_of. [1][2]
If you also want to reject non-integers instead of coercing (e.g., reject "1950"), enable strict validation:
class Model(BaseModel):
min_year: int = Field(ge=1950, strict=True)(strict is a supported Field parameter.) [2]
Alternative: use conint(ge=...) when you prefer a constrained int type:
from pydantic import BaseModel, conint
class Model(BaseModel):
min_year: conint(ge=1950)(conint supports ge and other bounds.) [3]
Sources
[1] Pydantic docs – Fields / Numeric constraints: https://docs.pydantic.dev/2.1/usage/fields/
[2] Pydantic docs – Field API parameters: https://docs.pydantic.dev/latest/api/fields/
[3] Pydantic docs – conint type: https://docs.pydantic.dev/2.2/api/types/
🏁 Script executed:
cat -n src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py | sed -n '1,150p'Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 6747
🏁 Script executed:
rg "asyncio.Semaphore" src/integrations/prefect-kubernetes/ -B 5 -A 5Repository: qodo-benchmark/prefect-combined-coderabbit
Length of output: 2231
Add validation constraint to prevent semaphore deadlock or initialization errors.
The startup_event_concurrency field is passed directly to asyncio.Semaphore() without validation. A value of 0 would block all event processing indefinitely; negative values would raise ValueError at semaphore initialization. Add ge=1 constraint to enforce a valid minimum.
🛠️ Suggested fix
startup_event_concurrency: int = Field(
default=5,
+ ge=1,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| startup_event_concurrency: int = Field( | |
| default=5, | |
| description="Maximum number of concurrent API calls when checking for " | |
| "duplicate events during observer startup. This helps prevent overloading " | |
| "the API server when there are many existing pods/jobs in the cluster.", | |
| ) | |
| startup_event_concurrency: int = Field( | |
| default=5, | |
| ge=1, | |
| description="Maximum number of concurrent API calls when checking for " | |
| "duplicate events during observer startup. This helps prevent overloading " | |
| "the API server when there are many existing pods/jobs in the cluster.", | |
| ) |
🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py` around
lines 59 - 64, The startup_event_concurrency Field currently allows 0 or
negative values which can cause asyncio.Semaphore deadlock or raise ValueError;
update the Field definition for startup_event_concurrency in settings.py to
enforce a minimum of 1 (e.g., add ge=1 to the Field constraints) so that any
invalid config is rejected before passing the value to asyncio.Semaphore.
Benchmark PR from agentic-review-benchmarks#11
Summary by CodeRabbit
New Features
startup_event_concurrency(default: 5) allows operators to limit concurrent API calls made during startup.✏️ Tip: You can customize this high-level summary in your review settings.