Skip to content

Add semaphore to limit concurrent API calls during Kubernetes observer startup#37

Open
tomerqodo wants to merge 2 commits intocoderabbit_full_base_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11from
coderabbit_full_head_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11
Open

Add semaphore to limit concurrent API calls during Kubernetes observer startup#37
tomerqodo wants to merge 2 commits intocoderabbit_full_base_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11from
coderabbit_full_head_add_semaphore_to_limit_concurrent_api_calls_during_kubernetes_observer_startup_pr11

Conversation

@tomerqodo
Copy link

@tomerqodo tomerqodo commented Jan 29, 2026

Benchmark PR from agentic-review-benchmarks#11

Summary by CodeRabbit

New Features

  • Added configurable concurrency control for startup event processing to prevent API server overload when initializing the Kubernetes observer. A new setting startup_event_concurrency (default: 5) allows operators to limit concurrent API calls made during startup.

✏️ Tip: You can customize this high-level summary in your review settings.

desertaxle and others added 2 commits January 25, 2026 12:10
…r startup

closes PrefectHQ#19937

When the observer starts, it receives events for all existing pods/jobs
and checks for duplicates via API calls. This can overwhelm the server
when there are many pods. This change adds a configurable semaphore
(default: 5 concurrent calls) to rate-limit these startup checks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@coderabbitai
Copy link

coderabbitai bot commented Jan 29, 2026

Walkthrough

Introduces a global semaphore mechanism to throttle concurrent startup event processing in the Kubernetes observer, configurable via a new startup_event_concurrency setting. The semaphore protects event-filter creation and API calls during observer startup. An existing test file is removed.

Changes

Cohort / File(s) Summary
Concurrency Control Implementation
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py
Adds global _startup_event_semaphore to throttle startup event processing. Initializes semaphore on observer startup using configurable concurrency limit. Wraps event-filter creation and API calls in semaphore context when handling startup events. Updates outbound POST payload structure to send precomputed event filter as JSON.
Configuration Settings
src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py
Adds new startup_event_concurrency: int field to KubernetesObserverSettings with default value of 5. Field controls maximum concurrent API calls during observer startup to prevent API server overload with many existing pods/jobs.
Test Removal
src/integrations/prefect-kubernetes/tests/test_observer.py
Entire test file removed, including comprehensive test coverage for pod event replication, deduplication, pod phase handling, evicted pod logic, observer lifecycle, and Kopf logging configuration.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Poem

🐰 A semaphore hops into view,
Throttling startup events with grace so true,
Settings configured with care,
Concurrency flows—light as air!
Yet tests vanish in the night so blue. 🌙

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is minimal and lacks the required sections from the template such as overview of changes, issue reference, and testing information. Add a comprehensive description including an overview of the changes, reference to issue #19937, and confirmation that tests were added or removed tests were addressed appropriately.
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and specifically describes the main change: introducing a semaphore to limit concurrent API calls during Kubernetes observer startup.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py`:
- Around line 129-158: The semaphore only guards EventFilter construction
because the orchestration_client.request call is outside the async with block;
move the POST call to orchestration_client.request(...) inside the async with
_startup_event_semaphore block so the semaphore actually throttles concurrent
API calls, keeping the EventFilter creation and the subsequent await
orchestration_client.request(...) within the same async with scope (references:
_startup_event_semaphore, EventFilter, orchestration_client.request).

In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py`:
- Around line 59-64: The startup_event_concurrency Field currently allows 0 or
negative values which can cause asyncio.Semaphore deadlock or raise ValueError;
update the Field definition for startup_event_concurrency in settings.py to
enforce a minimum of 1 (e.g., add ge=1 to the Field constraints) so that any
invalid config is rejected before passing the value to asyncio.Semaphore.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5376004 and 5297067.

📒 Files selected for processing (3)
  • src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py
  • src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py
  • src/integrations/prefect-kubernetes/tests/test_observer.py
💤 Files with no reviewable changes (1)
  • src/integrations/prefect-kubernetes/tests/test_observer.py
🧰 Additional context used
🧬 Code graph analysis (1)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py (1)
src/prefect/events/clients.py (1)
  • get_events_client (89-117)
🪛 Ruff (0.14.14)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py

133-133: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Pre-commit checks
🔇 Additional comments (2)
src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py (2)

47-47: LGTM. The global semaphore placeholder is a clear, minimal addition.


60-65: LGTM. Semaphore initialization is straightforward and ties cleanly to settings.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines 129 to +158
if event_type is None:
if orchestration_client is None:
raise RuntimeError("Orchestration client not initialized")

# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
if _startup_event_semaphore is None:
raise RuntimeError("Startup event semaphore not initialized")

# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)

response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

Python asyncio semaphore async with block throttle concurrency

💡 Result:

To throttle (cap) concurrency in asyncio, wrap the work you want to limit in an asyncio.Semaphore(N) using async with. Entering the block acquires the semaphore (may wait if the counter is 0) and exiting the block releases it. (docs.python.org)

import asyncio

sem = asyncio.Semaphore(5)  # allow at most 5 concurrent workers

async def worker(x):
    async with sem:          # acquire -> run -> release
        await asyncio.sleep(1)
        return x * 2

async def main():
    results = await asyncio.gather(*(worker(i) for i in range(100)))
    print(results[:10])

asyncio.run(main())

Notes:

  • async with sem: is the intended usage pattern (equivalent to await sem.acquire() / sem.release() in a try/finally). (docs.python.org)
  • If you write async with sem as token:, token will be None because the semaphore context manager doesn’t return a useful value from __aenter__; you typically don’t use as with semaphores. (stackoverflow.com)
  • Use asyncio.BoundedSemaphore(N) if you want an error when something over-releases (helps catch bugs). (docs.python.org)

Citations:


Semaphore doesn't throttle the API call.

The async with _startup_event_semaphore block wraps only the filter creation, but the POST request happens outside the block. Since code exiting the async with block releases the semaphore immediately, concurrent API calls are not actually limited. Move the orchestration_client.request() call inside the semaphore block to throttle it.

🛠️ Suggested fix
-        async with _startup_event_semaphore:
-            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
-            event_filter = EventFilter(
-                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
-                resource=EventResourceFilter(
-                    id=[f"prefect.kubernetes.pod.{uid}"],
-                ),
-                occurred=EventOccurredFilter(
-                    since=(
-                        k8s_created_time
-                        if k8s_created_time
-                        else (datetime.now(timezone.utc) - timedelta(hours=1))
-                    )
-                ),
-            )
-
-        response = await orchestration_client.request(
-            "POST",
-            "/events/filter",
-            json=dict(
-                filter=event_filter.model_dump(exclude_unset=True, mode="json")
-            ),
-        )
+        async with _startup_event_semaphore:
+            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
+            event_filter = EventFilter(
+                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
+                resource=EventResourceFilter(
+                    id=[f"prefect.kubernetes.pod.{uid}"],
+                ),
+                occurred=EventOccurredFilter(
+                    since=(
+                        k8s_created_time
+                        if k8s_created_time
+                        else (datetime.now(timezone.utc) - timedelta(hours=1))
+                    )
+                ),
+            )
+
+            response = await orchestration_client.request(
+                "POST",
+                "/events/filter",
+                json=dict(
+                    filter=event_filter.model_dump(exclude_unset=True, mode="json")
+                ),
+            )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if event_type is None:
if orchestration_client is None:
raise RuntimeError("Orchestration client not initialized")
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
if _startup_event_semaphore is None:
raise RuntimeError("Startup event semaphore not initialized")
# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
if event_type is None:
if orchestration_client is None:
raise RuntimeError("Orchestration client not initialized")
if _startup_event_semaphore is None:
raise RuntimeError("Startup event semaphore not initialized")
# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:
# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
)
🧰 Tools
🪛 Ruff (0.14.14)

131-131: Avoid specifying long messages outside the exception class

(TRY003)


133-133: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py` around
lines 129 - 158, The semaphore only guards EventFilter construction because the
orchestration_client.request call is outside the async with block; move the POST
call to orchestration_client.request(...) inside the async with
_startup_event_semaphore block so the semaphore actually throttles concurrent
API calls, keeping the EventFilter creation and the subsequent await
orchestration_client.request(...) within the same async with scope (references:
_startup_event_semaphore, EventFilter, orchestration_client.request).

Comment on lines +59 to +64
startup_event_concurrency: int = Field(
default=5,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py | sed -n '1,100p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 4456


🏁 Script executed:

rg "startup_event_concurrency" src/integrations/prefect-kubernetes/ -B 2 -A 5

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1721


🏁 Script executed:

ast-grep --pattern 'class $_ {
  $$$
  startup_event_concurrency $$$
  $$$
}'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 68


🏁 Script executed:

rg "startup_event_concurrency" --type py -B 3 -A 3

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 1596


🌐 Web query:

Pydantic v2 Field constraints ge greater than or equal minimum integer validation

💡 Result:

In Pydantic v2, to validate that an integer is greater than or equal to a minimum, use ge (“greater or equal”) on Field:

from pydantic import BaseModel, Field

class Model(BaseModel):
    min_year: int = Field(ge=1950)

Model(min_year=1950)   # ok
Model(min_year=1949)   # ValidationError
  • ge=... enforces value >= ... for numeric fields. In generated JSON Schema, ge becomes minimum. [1]
  • Field officially supports numeric constraints gt/ge/lt/le/multiple_of. [1][2]

If you also want to reject non-integers instead of coercing (e.g., reject "1950"), enable strict validation:

class Model(BaseModel):
    min_year: int = Field(ge=1950, strict=True)

(strict is a supported Field parameter.) [2]

Alternative: use conint(ge=...) when you prefer a constrained int type:

from pydantic import BaseModel, conint

class Model(BaseModel):
    min_year: conint(ge=1950)

(conint supports ge and other bounds.) [3]

Sources
[1] Pydantic docs – Fields / Numeric constraints: https://docs.pydantic.dev/2.1/usage/fields/
[2] Pydantic docs – Field API parameters: https://docs.pydantic.dev/latest/api/fields/
[3] Pydantic docs – conint type: https://docs.pydantic.dev/2.2/api/types/


🏁 Script executed:

cat -n src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py | sed -n '1,150p'

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 6747


🏁 Script executed:

rg "asyncio.Semaphore" src/integrations/prefect-kubernetes/ -B 5 -A 5

Repository: qodo-benchmark/prefect-combined-coderabbit

Length of output: 2231


Add validation constraint to prevent semaphore deadlock or initialization errors.

The startup_event_concurrency field is passed directly to asyncio.Semaphore() without validation. A value of 0 would block all event processing indefinitely; negative values would raise ValueError at semaphore initialization. Add ge=1 constraint to enforce a valid minimum.

🛠️ Suggested fix
     startup_event_concurrency: int = Field(
         default=5,
+        ge=1,
         description="Maximum number of concurrent API calls when checking for "
         "duplicate events during observer startup. This helps prevent overloading "
         "the API server when there are many existing pods/jobs in the cluster.",
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
startup_event_concurrency: int = Field(
default=5,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)
startup_event_concurrency: int = Field(
default=5,
ge=1,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)
🤖 Prompt for AI Agents
In `@src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py` around
lines 59 - 64, The startup_event_concurrency Field currently allows 0 or
negative values which can cause asyncio.Semaphore deadlock or raise ValueError;
update the Field definition for startup_event_concurrency in settings.py to
enforce a minimum of 1 (e.g., add ge=1 to the Field constraints) so that any
invalid config is rejected before passing the value to asyncio.Semaphore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants