Skip to content

Code Review Bench PR #19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup#11

Closed
ketkarameya wants to merge 2 commits intobase_pr_19957_20260125_2338from
corrupted_pr_19957_20260125_2338
Closed

Code Review Bench PR #19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup#11
ketkarameya wants to merge 2 commits intobase_pr_19957_20260125_2338from
corrupted_pr_19957_20260125_2338

Conversation

@ketkarameya
Copy link

@ketkarameya ketkarameya commented Feb 10, 2026

Code Review Bench PR PrefectHQ#19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup

Benchmark PR for Gitar evaluation

Original PR: agentic-review-benchmarks#11

Add semaphore to limit concurrent API calls during Kubernetes observer startup


Summary by Gitar

  • Adds asyncio.Semaphore rate limiting to prevent API server overload during Kubernetes observer startup by limiting concurrent /events/filter API calls to 5 (configurable via PREFECT_INTEGRATIONS_KUBERNETES_OBSERVER_STARTUP_EVENT_CONCURRENCY)
  • Introduces global _startup_event_semaphore initialized in initialize_clients() kopf startup handler and wraps duplicate event check logic in _replicate_pod_event() to enforce the concurrency limit
  • Adds startup_event_concurrency setting to KubernetesObserverSettings with default value of 5, allowing users to tune concurrency for different cluster sizes
  • Removes entire test file test_observer.py (489 lines) without replacement, eliminating test coverage for all observer functionality

desertaxle and others added 2 commits January 25, 2026 12:10
…r startup

closes PrefectHQ#19937

When the observer starts, it receives events for all existing pods/jobs
and checks for duplicates via API calls. This can overwhelm the server
when there are many pods. This change adds a configurable semaphore
(default: 5 concurrent calls) to rate-limit these startup checks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
@gitar-bot-staging
Copy link

gitar-bot-staging bot commented Feb 10, 2026

🔍 CI failure analysis for 5297067: The ruff format pre-commit hook failed due to a code formatting issue in observer.py that requires a multi-line dict parameter.

Issue

The ruff format pre-commit hook failed because code formatting does not meet the repository's style requirements.

Root Cause

The PR modifies src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py, and one line needs to be reformatted to comply with Ruff's formatting rules. The line at the orchestration_client.request() call:

json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),

needs to be split across multiple lines:

json=dict(
    filter=event_filter.model_dump(exclude_unset=True, mode="json")
),

Details

The pre-commit hook runs ruff format on all Python files to enforce consistent code formatting. The hook detected that one file would be modified by the formatter, indicating the current code doesn't match the expected formatting.

CI Log Evidence:

  • don't commit to branch check passed
  • ruff check (linting) passed
  • ruff format (formatting) failed
  • Status: "files were modified by this hook"
  • Result: "1 file reformatted, 1599 files left unchanged"

The formatting issue is in src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py around line 156 where the json=dict(...) parameter exceeds formatting constraints.

Code Review 🚫 Blocked 0 resolved / 3 findings

Critical bug: the API request is outside the semaphore context manager, so rate limiting has no effect. The entire test file was also deleted without replacement, leaving zero test coverage for this module.

🚨 Bug: API request is outside semaphore — rate limiting is ineffective

📄 src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py:137

The async with _startup_event_semaphore: block on line 137 only wraps the construction of the event_filter object (lines 138-151). The actual HTTP request at line 153 (response = await orchestration_client.request(...)) is outside the semaphore context, meaning the semaphore is released before the API call is made.

This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the response = await ... line needs to be moved inside the async with block.

Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server.

Suggested fix
        async with _startup_event_semaphore:
            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
            event_filter = EventFilter(
                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
                resource=EventResourceFilter(
                    id=[f"prefect.kubernetes.pod.{uid}"],
                ),
                occurred=EventOccurredFilter(
                    since=(
                        k8s_created_time
                        if k8s_created_time
                        else (datetime.now(timezone.utc) - timedelta(hours=1))
                    )
                ),
            )

            response = await orchestration_client.request(
                "POST",
                "/events/filter",
                json=dict(
                    filter=event_filter.model_dump(exclude_unset=True, mode="json")
                ),
            )
⚠️ Quality: Entire test file deleted with no replacement tests

📄 src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py:47

The PR deletes test_observer.py (489 lines covering _replicate_pod_event, start_observer/stop_observer, logging configuration, evicted pods, duplicate event filtering, different phases, and deterministic event IDs) without adding any replacement tests.

This removes all test coverage for the observer module, including pre-existing functionality unrelated to the semaphore change. The new semaphore behavior also has no test coverage, making it easy for bugs like the indentation issue (the API call being outside the semaphore block) to go undetected.

At minimum, the existing tests should be preserved (updated to account for the new semaphore initialization requirement), and new tests should be added to verify:

  1. The semaphore is properly initialized during startup
  2. Concurrent API calls are actually rate-limited
  3. The RuntimeError is raised when the semaphore is not initialized
💡 Edge Case: No validation on startup_event_concurrency (allows ≤ 0)

📄 src/integrations/prefect-kubernetes/prefect_kubernetes/settings.py:59

The startup_event_concurrency field accepts any int, including 0 and negative values. asyncio.Semaphore(0) would deadlock all startup event processing since no task could ever acquire the semaphore. A negative value would raise ValueError from asyncio.Semaphore.

Consider adding a ge=1 constraint to the Field to ensure a valid concurrency limit:

startup_event_concurrency: int = Field(
    default=5,
    ge=1,
    description="Maximum number of concurrent API calls..."
)
Suggested fix
    startup_event_concurrency: int = Field(
        default=5,
        ge=1,
        description="Maximum number of concurrent API calls when checking for "
        "duplicate events during observer startup. This helps prevent overloading "
        "the API server when there are many existing pods/jobs in the cluster.",
    )
Rules 🎸 1 action taken

Gitar Rules

🎸 Summary Enhancement: PR description was minimal (template-only). Appended technical summary with 4 bullet points covering rate limiting implementation, semaphore initialization, configuration settings, and test file deletion

5 rules not applicable. Show all rules by commenting gitar display:verbose.

Tip

Comment Gitar fix CI or enable auto-apply: gitar auto-apply:on

Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar


# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: API request is outside semaphore — rate limiting is ineffective

The async with _startup_event_semaphore: block on line 137 only wraps the construction of the event_filter object (lines 138-151). The actual HTTP request at line 153 (response = await orchestration_client.request(...)) is outside the semaphore context, meaning the semaphore is released before the API call is made.

This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the response = await ... line needs to be moved inside the async with block.

Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server.

Was this helpful? React with 👍 / 👎

Suggested fix
        async with _startup_event_semaphore:
            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
            event_filter = EventFilter(
                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
                resource=EventResourceFilter(
                    id=[f"prefect.kubernetes.pod.{uid}"],
                ),
                occurred=EventOccurredFilter(
                    since=(
                        k8s_created_time
                        if k8s_created_time
                        else (datetime.now(timezone.utc) - timedelta(hours=1))
                    )
                ),
            )

            response = await orchestration_client.request(
                "POST",
                "/events/filter",
                json=dict(
                    filter=event_filter.model_dump(exclude_unset=True, mode="json")
                ),
            )
  • Apply suggested fix


events_client: EventsClient | None = None
orchestration_client: PrefectClient | None = None
_startup_event_semaphore: asyncio.Semaphore | None = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Quality: Entire test file deleted with no replacement tests

The PR deletes test_observer.py (489 lines covering _replicate_pod_event, start_observer/stop_observer, logging configuration, evicted pods, duplicate event filtering, different phases, and deterministic event IDs) without adding any replacement tests.

This removes all test coverage for the observer module, including pre-existing functionality unrelated to the semaphore change. The new semaphore behavior also has no test coverage, making it easy for bugs like the indentation issue (the API call being outside the semaphore block) to go undetected.

At minimum, the existing tests should be preserved (updated to account for the new semaphore initialization requirement), and new tests should be added to verify:

  1. The semaphore is properly initialized during startup
  2. Concurrent API calls are actually rate-limited
  3. The RuntimeError is raised when the semaphore is not initialized

Was this helpful? React with 👍 / 👎

  • Apply suggested fix

@kayagokalp kayagokalp closed this Feb 25, 2026
@kayagokalp kayagokalp deleted the corrupted_pr_19957_20260125_2338 branch February 25, 2026 02:08
@kayagokalp kayagokalp restored the corrupted_pr_19957_20260125_2338 branch February 25, 2026 02:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants