-
Notifications
You must be signed in to change notification settings - Fork 0
Code Review Bench PR #19957 - Add semaphore to limit concurrent API calls during Kubernetes observer startup #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
|
|
||
| events_client: EventsClient | None = None | ||
| orchestration_client: PrefectClient | None = None | ||
| _startup_event_semaphore: asyncio.Semaphore | None = None | ||
|
|
||
|
|
||
| @kopf.on.startup() | ||
|
|
@@ -56,8 +57,12 @@ async def initialize_clients(logger: kopf.Logger, **kwargs: Any): | |
| logger.info("Initializing clients") | ||
| global events_client | ||
| global orchestration_client | ||
| global _startup_event_semaphore | ||
| orchestration_client = await get_client().__aenter__() | ||
| events_client = await get_events_client().__aenter__() | ||
| _startup_event_semaphore = asyncio.Semaphore( | ||
| settings.observer.startup_event_concurrency | ||
| ) | ||
| logger.info("Clients successfully initialized") | ||
|
|
||
|
|
||
|
|
@@ -124,26 +129,33 @@ async def _replicate_pod_event( # pyright: ignore[reportUnusedFunction] | |
| if event_type is None: | ||
| if orchestration_client is None: | ||
| raise RuntimeError("Orchestration client not initialized") | ||
|
|
||
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | ||
| event_filter = EventFilter( | ||
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | ||
| resource=EventResourceFilter( | ||
| id=[f"prefect.kubernetes.pod.{uid}"], | ||
| ), | ||
| occurred=EventOccurredFilter( | ||
| since=( | ||
| k8s_created_time | ||
| if k8s_created_time | ||
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | ||
| ) | ||
| ), | ||
| ) | ||
| if _startup_event_semaphore is None: | ||
| raise RuntimeError("Startup event semaphore not initialized") | ||
|
|
||
| # Use semaphore to limit concurrent API calls during startup to prevent | ||
| # overwhelming the API server when there are many existing pods/jobs | ||
| async with _startup_event_semaphore: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚨 Bug: API request is outside semaphore — rate limiting is ineffectiveThe This completely defeats the purpose of the PR — all concurrent API calls will still fire simultaneously because the semaphore only gates a cheap, local object construction. The indentation of the Impact: The core feature this PR is meant to deliver (rate limiting API calls during startup) does not work at all. Large clusters will still overwhelm the API server. Was this helpful? React with 👍 / 👎 Suggested fix
|
||
| # Use the Kubernetes event timestamp for the filter to avoid "Query time range is too large" error | ||
| event_filter = EventFilter( | ||
| event=EventNameFilter(name=[f"prefect.kubernetes.pod.{phase.lower()}"]), | ||
| resource=EventResourceFilter( | ||
| id=[f"prefect.kubernetes.pod.{uid}"], | ||
| ), | ||
| occurred=EventOccurredFilter( | ||
| since=( | ||
| k8s_created_time | ||
| if k8s_created_time | ||
| else (datetime.now(timezone.utc) - timedelta(hours=1)) | ||
| ) | ||
| ), | ||
| ) | ||
|
|
||
| response = await orchestration_client.request( | ||
| "POST", | ||
| "/events/filter", | ||
| json=dict(filter=event_filter.model_dump(exclude_unset=True, mode="json")), | ||
| json=dict( | ||
| filter=event_filter.model_dump(exclude_unset=True, mode="json") | ||
| ), | ||
| ) | ||
| # If the event already exists, we don't need to emit a new one. | ||
| if response.json()["events"]: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR deletes
test_observer.py(489 lines covering_replicate_pod_event,start_observer/stop_observer, logging configuration, evicted pods, duplicate event filtering, different phases, and deterministic event IDs) without adding any replacement tests.This removes all test coverage for the observer module, including pre-existing functionality unrelated to the semaphore change. The new semaphore behavior also has no test coverage, making it easy for bugs like the indentation issue (the API call being outside the semaphore block) to go undetected.
At minimum, the existing tests should be preserved (updated to account for the new semaphore initialization requirement), and new tests should be added to verify:
RuntimeErroris raised when the semaphore is not initializedWas this helpful? React with 👍 / 👎