Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 28 additions & 16 deletions src/integrations/prefect-kubernetes/prefect_kubernetes/observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

events_client: EventsClient | None = None
orchestration_client: PrefectClient | None = None
_startup_event_semaphore: asyncio.Semaphore | None = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Quality: Entire test file deleted with no replacement tests

The PR deletes test_observer.py (489 lines covering _replicate_pod_event, start_observer/stop_observer, logging configuration, evicted pods, duplicate event filtering, different phases, and deterministic event IDs) without adding any replacement tests.

This removes all test coverage for the observer module, including pre-existing functionality unrelated to the semaphore change. The new semaphore behavior also has no test coverage, making it easy for bugs like the indentation issue (the API call being outside the semaphore block) to go undetected.

At minimum, the existing tests should be preserved (updated to account for the new semaphore initialization requirement), and new tests should be added to verify:

  1. The semaphore is properly initialized during startup
  2. Concurrent API calls are actually rate-limited
  3. The RuntimeError is raised when the semaphore is not initialized

Was this helpful? React with 👍 / 👎

  • Apply suggested fix



@kopf.on.startup()
Expand All @@ -56,8 +57,12 @@ async def initialize_clients(logger: kopf.Logger, **kwargs: Any):
logger.info("Initializing clients")
global events_client
global orchestration_client
global _startup_event_semaphore
orchestration_client = await get_client().__aenter__()
events_client = await get_events_client().__aenter__()
_startup_event_semaphore = asyncio.Semaphore(
settings.observer.startup_event_concurrency
)
logger.info("Clients successfully initialized")


Expand Down Expand Up @@ -124,26 +129,33 @@ async def _replicate_pod_event( # pyright: ignore[reportUnusedFunction]
if event_type is None:
if orchestration_client is None:
raise RuntimeError("Orchestration client not initialized")

# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)
if _startup_event_semaphore is None:
raise RuntimeError("Startup event semaphore not initialized")

# Use semaphore to limit concurrent API calls during startup to prevent
# overwhelming the API server when there are many existing pods/jobs
async with _startup_event_semaphore:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: API request is outside semaphore — rate limiting is ineffective

The async with _startup_event_semaphore: block on line 137 only wraps the construction of the event_filter object (lines 138-151). The actual HTTP request at line 153 (response = await orchestration_client.request(...)) is outside the semaphore context, meaning the semaphore is released before the API call is made.

This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the response = await ... line needs to be moved inside the async with block.

Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server.

Was this helpful? React with 👍 / 👎

Suggested fix
        async with _startup_event_semaphore:
            # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
            event_filter = EventFilter(
                event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
                resource=EventResourceFilter(
                    id=[f"prefect.kubernetes.pod.{uid}"],
                ),
                occurred=EventOccurredFilter(
                    since=(
                        k8s_created_time
                        if k8s_created_time
                        else (datetime.now(timezone.utc) - timedelta(hours=1))
                    )
                ),
            )

            response = await orchestration_client.request(
                "POST",
                "/events/filter",
                json=dict(
                    filter=event_filter.model_dump(exclude_unset=True, mode="json")
                ),
            )
  • Apply suggested fix

# Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error
event_filter = EventFilter(
event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]),
resource=EventResourceFilter(
id=[f"prefect.kubernetes.pod.{uid}"],
),
occurred=EventOccurredFilter(
since=(
k8s_created_time
if k8s_created_time
else (datetime.now(timezone.utc) - timedelta(hours=1))
)
),
)

response = await orchestration_client.request(
"POST",
"/events/filter",
json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")),
json=dict(
filter=event_filter.model_dump(exclude_unset=True, mode="json")
),
)
# If the event already exists, we don't need to emit a new one.
if response.json()["events"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ class KubernetesObserverSettings(PrefectBaseSettings):
"should be provided in the format `key=value`.",
)

startup_event_concurrency: int = Field(
default=5,
description="Maximum number of concurrent API calls when checking for "
"duplicate events during observer startup. This helps prevent overloading "
"the API server when there are many existing pods/jobs in the cluster.",
)


class KubernetesWorkerSettings(PrefectBaseSettings):
model_config = build_settings_config(("integrations", "kubernetes", "worker"))
Expand Down
Loading
Loading