-
Notifications
You must be signed in to change notification settings - Fork 20
Lease handling user enhancements #671
Changes from all commits
f50c12b
41ee94c
cfbe5ff
8256580
f20fd63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -9,7 +9,14 @@ | |||||
| from datetime import datetime, timedelta | ||||||
| from typing import Any, Self | ||||||
|
|
||||||
| from anyio import AsyncContextManagerMixin, ContextManagerMixin, create_task_group, fail_after, sleep | ||||||
| from anyio import ( | ||||||
| AsyncContextManagerMixin, | ||||||
| CancelScope, | ||||||
| ContextManagerMixin, | ||||||
| create_task_group, | ||||||
| fail_after, | ||||||
| sleep, | ||||||
| ) | ||||||
| from anyio.from_thread import BlockingPortal | ||||||
| from grpc.aio import Channel | ||||||
| from jumpstarter_protocol import jumpstarter_pb2, jumpstarter_pb2_grpc | ||||||
|
|
@@ -40,6 +47,8 @@ class Lease(ContextManagerMixin, AsyncContextManagerMixin): | |||||
| controller: jumpstarter_pb2_grpc.ControllerServiceStub = field(init=False) | ||||||
| tls_config: TLSConfigV1Alpha1 = field(default_factory=TLSConfigV1Alpha1) | ||||||
| grpc_options: dict[str, Any] = field(default_factory=dict) | ||||||
| acquisition_timeout: int = field(default=7200) # Timeout in seconds for lease acquisition, polled in 5s intervals | ||||||
| exporter_name: str = field(default="remote", init=False) # Populated during acquisition | ||||||
|
|
||||||
| def __post_init__(self): | ||||||
| if hasattr(super(), "__post_init__"): | ||||||
|
|
@@ -57,7 +66,7 @@ async def _create(self): | |||||
| duration=self.duration, | ||||||
| ) | ||||||
| ).name | ||||||
| logger.info("Created lease request for selector %s for duration %s", self.selector, self.duration) | ||||||
| logger.info("Acquiring lease %s for selector %s for duration %s", self.name, self.selector, self.duration) | ||||||
|
|
||||||
| async def get(self): | ||||||
| with translate_grpc_exceptions(): | ||||||
|
|
@@ -99,54 +108,70 @@ async def request_async(self): | |||||
| await self._create() | ||||||
| else: | ||||||
| await self._create() | ||||||
|
|
||||||
| return await self._acquire() | ||||||
|
|
||||||
| async def _acquire(self): | ||||||
| """Acquire a lease. | ||||||
|
|
||||||
| Makes sure the lease is ready, and returns the lease object. | ||||||
| """ | ||||||
| with fail_after(300): # TODO: configurable timeout | ||||||
| while True: | ||||||
| logger.debug("Polling Lease %s", self.name) | ||||||
| result = await self.get() | ||||||
| # lease ready | ||||||
| if condition_true(result.conditions, "Ready"): | ||||||
| logger.debug("Lease %s acquired", self.name) | ||||||
| return self | ||||||
| # lease unsatisfiable | ||||||
| if condition_true(result.conditions, "Unsatisfiable"): | ||||||
| message = condition_message(result.conditions, "Unsatisfiable") | ||||||
| logger.debug( | ||||||
| "Lease %s cannot be satisfied: %s", | ||||||
| self.name, | ||||||
| condition_message(result.conditions, "Unsatisfiable"), | ||||||
| ) | ||||||
| raise LeaseError(f"the lease cannot be satisfied: {message}") | ||||||
|
|
||||||
| # lease not pending | ||||||
| if condition_false(result.conditions, "Pending"): | ||||||
| raise LeaseError( | ||||||
| f"Lease {self.name} is not in pending, but it isn't in Ready or Unsatisfiable state either" | ||||||
| ) | ||||||
|
|
||||||
| # lease released | ||||||
| if condition_present_and_equal(result.conditions, "Ready", "False", "Released"): | ||||||
| raise LeaseError(f"lease {self.name} released") | ||||||
|
|
||||||
| await sleep(1) | ||||||
| try: | ||||||
| with fail_after(self.acquisition_timeout): | ||||||
| while True: | ||||||
| logger.debug("Polling Lease %s", self.name) | ||||||
| result = await self.get() | ||||||
| # lease ready | ||||||
| if condition_true(result.conditions, "Ready"): | ||||||
| logger.debug("Lease %s acquired", self.name) | ||||||
| self.exporter_name = result.exporter | ||||||
| return self | ||||||
| # lease unsatisfiable | ||||||
| if condition_true(result.conditions, "Unsatisfiable"): | ||||||
| message = condition_message(result.conditions, "Unsatisfiable") | ||||||
| logger.debug("Lease %s cannot be satisfied: %s", self.name, message) | ||||||
| raise LeaseError(f"the lease cannot be satisfied: {message}") | ||||||
|
|
||||||
| # lease invalid | ||||||
| if condition_true(result.conditions, "Invalid"): | ||||||
| message = condition_message(result.conditions, "Invalid") | ||||||
| logger.debug("Lease %s is invalid: %s", self.name, message) | ||||||
| raise LeaseError(f"the lease is invalid: {message}") | ||||||
|
|
||||||
| # lease not pending | ||||||
| if condition_false(result.conditions, "Pending"): | ||||||
| raise LeaseError( | ||||||
| f"Lease {self.name} is not in pending, but it isn't in Ready or Unsatisfiable state either" | ||||||
| ) | ||||||
|
|
||||||
| # lease released | ||||||
| if condition_present_and_equal(result.conditions, "Ready", "False", "Released"): | ||||||
| raise LeaseError(f"lease {self.name} released") | ||||||
|
|
||||||
| await sleep(5) | ||||||
| except TimeoutError: | ||||||
| logger.debug(f"Lease {self.name} acquisition timed out after {self.acquisition_timeout} seconds") | ||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will be obvious by the exception anyway
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i was trying to compare both debug and normal output and it makes IMHO more sense like this: where the "Error: lease...." is red. I.e. the end-user error message is already there, just through the LeaseError handler |
||||||
| raise LeaseError( | ||||||
| f"lease {self.name} acquisition timed out after {self.acquisition_timeout} seconds" | ||||||
| ) from None | ||||||
|
|
||||||
| @asynccontextmanager | ||||||
| async def __asynccontextmanager__(self) -> AsyncGenerator[Self]: | ||||||
| value = await self.request_async() | ||||||
| try: | ||||||
| value = await self.request_async() | ||||||
| yield value | ||||||
| finally: | ||||||
| if self.release: | ||||||
| if self.release and self.name: | ||||||
| logger.info("Releasing Lease %s", self.name) | ||||||
| await self.svc.DeleteLease( | ||||||
| name=self.name, | ||||||
| ) | ||||||
| # Shield cleanup from cancellation to ensure it completes | ||||||
| with CancelScope(shield=True): | ||||||
| try: | ||||||
| with fail_after(30): | ||||||
| await self.svc.DeleteLease( | ||||||
| name=self.name, | ||||||
| ) | ||||||
| except TimeoutError: | ||||||
| logger.warning("Timeout while deleting lease %s during cleanup", self.name) | ||||||
|
|
||||||
| @contextmanager | ||||||
| def __contextmanager__(self) -> Generator[Self]: | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,7 +59,7 @@ def launch_shell( | |
|
|
||
| Args: | ||
| host: The jumpstarter host path | ||
| context: The context of the shell ("local" or "remote") | ||
| context: The context of the shell (e.g. "local" or exporter name) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Has this really haned? I think it.s local/remote
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but it could make sense to set the exporter name instead of remote :D that would be cool
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has been ideated by AI! 🌊
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so that's all contained in fairly small jumpstarter-dev/jumpstarter@62d1c9e or do you mean anything else? |
||
| allow: List of allowed drivers | ||
| unsafe: Whether to allow drivers outside of the allow list | ||
| """ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,16 @@ def decode_unsafe(self) -> Self: | |
| return self | ||
|
|
||
|
|
||
| class ClientConfigV1Alpha1Lease(BaseSettings): | ||
| """Configuration for lease operations.""" | ||
|
|
||
| acquisition_timeout: int = Field( | ||
| default=7200, | ||
| description="Timeout in seconds for lease acquisition", | ||
| ge=5, # Must be at least 5 seconds (polling interval) | ||
| ) | ||
|
|
||
|
|
||
| class ClientConfigV1Alpha1(BaseSettings): | ||
| CLIENT_CONFIGS_PATH: ClassVar[Path] = CONFIG_PATH / "clients" | ||
|
|
||
|
|
@@ -108,6 +118,8 @@ class ClientConfigV1Alpha1(BaseSettings): | |
|
|
||
| shell: ShellConfigV1Alpha1 = Field(default_factory=ShellConfigV1Alpha1) | ||
|
|
||
| leases: ClientConfigV1Alpha1Lease = Field(default_factory=ClientConfigV1Alpha1Lease) | ||
|
|
||
| async def channel(self): | ||
| if self.endpoint is None or self.token is None: | ||
| raise ConfigurationError("endpoint or token not set in client config") | ||
|
|
@@ -258,6 +270,7 @@ async def lease_async( | |
| release=release_lease, | ||
| tls_config=self.tls, | ||
| grpc_options=self.grpcOptions, | ||
| acquisition_timeout=self.leases.acquisition_timeout, | ||
| ) as lease: | ||
|
Comment on lines
271
to
274
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chainConfirm Lease(...) supports acquisition_timeout. Wiring looks right; verify the Lease constructor accepts acquisition_timeout and that units (seconds) match its API. 🏁 Script executed: #!/bin/bash
# Verify Lease.__init__ signature and usages for acquisition_timeout (seconds)
rg -nC2 -P 'class\s+Lease\b' packages
rg -nC3 -P '__init__\s*\(.*acquisition_timeout' packages
rg -nC2 -P 'acquisition_timeout\s*=' packagesLength of output: 1187 Fix parameter mismatch: 🤖 Prompt for AI Agents |
||
| yield lease | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reauth can fail if config wasn’t attached; harden and align token check.
If get_config() returns None (e.g., upstream didn’t tag the exception), login_func(config) may fail. Also, this block checks "expired" case-insensitively, while the upstream decorator checks "token is expired" case-sensitively, risking missed tagging.
Suggested changes:
Pair with the client-side change below to ensure config is always attached.
📝 Committable suggestion
🤖 Prompt for AI Agents