Add prune and re-enqueue signal functionality#322
Add prune and re-enqueue signal functionality#322NiveditJain merged 20 commits intoexospherehost:mainfrom
Conversation
- Introduced new routes for pruning states and re-enqueuing states after a specified time. - Added corresponding controller functions to handle the logic for pruning and re-enqueuing states, including validation and error handling. - Created new signal models for request and response structures related to pruning and re-enqueuing operations. - Updated the State model to include a new field for enqueue_after, enhancing state management capabilities. - Enhanced logging for better traceability of operations related to state management.
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. Warning Rate limit exceeded@NiveditJain has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 11 minutes and 4 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds two SDK signal exceptions (PruneSignal, ReQueueAfterSignal) and exposes them publicly; runtime posts these signals to new state-manager signal endpoints. State-manager gains signal models, controllers, and routes, a PRUNED status, state data/enqueue_after fields and index, enqueue selection honoring enqueue_after, plus tests and docs. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Node
participant Runtime
participant "State Manager API" as SM
rect #eef7ff
Note over Node,Runtime: Node executes inside runtime
Node->>Runtime: execute node
Runtime->>Runtime: run node logic
end
alt PruneSignal raised
Runtime->>SM: POST /v0/namespace/{ns}/states/{id}/prune\nheaders: x-api-key\nbody: { data }
SM-->>Runtime: 200 { status: PRUNED, enqueue_after }
else ReQueueAfterSignal raised
Runtime->>SM: POST /v0/namespace/{ns}/states/{id}/re-enqueue-after\nbody: { enqueue_after }
SM-->>Runtime: 200 { status: CREATED, enqueue_after }
else No signal
Runtime->>SM: POST notify executed/errored (existing endpoints)
SM-->>Runtime: 2xx
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Summary of Changes
Hello @NiveditJain, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces significant enhancements to the state management system by providing new mechanisms to control the lifecycle of states. It enables states to be explicitly marked as 'pruned' or to be re-enqueued after a specified delay, offering greater flexibility in handling state transitions and retries. These changes improve the system's ability to manage complex workflows and ensure states are processed efficiently.
Highlights
- New API Endpoints for State Management: Introduced new API endpoints for managing state lifecycle: /states/{state_id}/prune and /states/{state_id}/re-enqueue-after.
- Controller Logic for Pruning and Re-enqueuing: Implemented controller logic (prune_signal, re_queue_after_signal) to handle state pruning and delayed re-enqueuing, including validation and error handling.
- New Signal Models: Defined new Pydantic models (SignalResponseModel, PruneRequestModel, ReEnqueueAfterRequestModel) for request and response structures related to these new signal operations.
- State Model Enhancements: Extended the State database model with a data field for additional state information and an enqueue_after timestamp to control when a state becomes eligible for re-enqueuing.
- New State Status: Added PRUNED as a new status to the StateStatusEnum to reflect the new state lifecycle.
- Database Index Optimizations: Optimized database queries by adding new indexes on enqueue_after, status, namespace_name, and node_name fields in the State collection.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Code Review
This pull request introduces functionality to prune and re-enqueue states via new API endpoints. The changes include new controller logic, model updates for states and signals, and database index additions. My review focuses on improving security, correctness, and performance.
Key feedback points include:
- Security: The new signal handlers should scope database queries by namespace to prevent cross-namespace data modification.
- Correctness: The logic for re-enqueuing seems to calculate the new timestamp based on the old one, which might lead to unexpected behavior; it should likely be based on the current time. Also, there's redundant code in the new routes and controllers.
- Performance: The new database indexes can be consolidated into a more efficient compound index.
- Clarity: Some model field descriptions are ambiguous and should be clarified.
Overall, the changes are a good addition, but the identified issues, particularly around security and correctness, should be addressed before merging.
- Corrected the spelling of "Recieved" to "Received" in logging statements within the prune_signal and re_queue_after_signal functions for improved clarity. - Fixed a typo in the comment header from "singnals" to "signals" in the routes.py file, enhancing code readability.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
- Corrected the import path for the re_queue_after_signal controller in routes.py. - Introduced a new controller function in re_queue_after_signal.py to handle re-queuing of states, including error handling and logging. - Updated the State model to consolidate indexing for enqueue_after and status fields, improving database query performance.
…ime-related functionality.
- Introduced comprehensive tests for the new prune and re-enqueue signal routes, validating request models and response handling. - Implemented model validation tests for PruneRequestModel and ReEnqueueAfterRequestModel, ensuring correct data handling. - Added controller tests for prune_signal and re_queue_after_signal functions, covering success and error scenarios. - Enhanced SignalResponseModel tests to verify correct serialization and deserialization of responses. - Improved overall test coverage for state management operations, ensuring robustness and reliability.
- Removed unused SignalResponseModel import from test files for prune_signal and re_queue_after_signal. - Streamlined test dependencies to improve clarity and maintainability.
- Added new signals: PruneSingal and ReQueueAfterSingal to handle state management operations. - Updated Runtime class to include methods for constructing endpoints for pruning and requeuing states. - Enhanced error handling in the Runtime class to manage new signal exceptions. - Bumped version to 0.0.2b2 to reflect the addition of new features.
- Updated test_package_init.py to include new expected exports for PruneSingal and ReQueueAfterSingal. - Created test_signals_and_runtime_functions.py to implement comprehensive unit tests for PruneSingal and ReQueueAfterSingal, covering initialization, sending, and error handling. - Enhanced test coverage for signal handling in the Runtime class, ensuring robust integration with state management operations.
There was a problem hiding this comment.
Actionable comments posted: 42
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (6)
python-sdk/exospherehost/runtime.py (2)
262-273: HTTP error handling should raise to trip circuit.
non-blocking suggestion: consider raising on non-200 to surface back-pressure and avoid silent drops.Example:
- if response.status != 200: - logger.error(f"Failed to notify executed state {state_id}: {res}") + if response.status != 200: + raise RuntimeError(f"notify executed {state_id} failed: {response.status} {res}")Apply similarly to errored path.
Also applies to: 287-292
378-421: Consider reusing a shared aiohttp session across the runtime.
You open/close a ClientSession per request; reuse per process to reduce connection overhead.Direction: create self._session in _start(), pass to helpers, and close on shutdown.
python-sdk/tests/test_package_init.py (1)
20-25: Overly strict all assertion; make it a subset check.
This fails when new legitimate exports are added.- for export in __all__: - assert export in expected_exports, f"Unexpected export: {export}" + unexpected = set(__all__) - set(expected_exports) + assert not unexpected, f"Unexpected exports: {sorted(unexpected)}"state-manager/app/controller/enqueue_states.py (2)
15-24: Pick earliest due state deterministically and compute now once.Add a stable sort by enqueue_after (then _id) and avoid repeated time() calls.
async def find_state(namespace_name: str, nodes: list[str]) -> State | None: - data = await State.get_pymongo_collection().find_one_and_update( + now_ms = int(time.time() * 1000) + data = await State.get_pymongo_collection().find_one_and_update( { "namespace_name": namespace_name, "status": StateStatusEnum.CREATED, "node_name": { "$in": nodes }, - "enqueue_after": {"$lte": int(time.time() * 1000)} + "enqueue_after": {"$lte": now_ms} }, { "$set": {"status": StateStatusEnum.QUEUED} }, + sort=[("enqueue_after", 1), ("_id", 1)], return_document=ReturnDocument.AFTER )
32-66: Batch gather is OK; consider metrics for count and lag.Emit counters and enqueue lag (now_ms - state.enqueue_after) to monitor scheduling health.
state-manager/app/models/db/state.py (1)
50-57: insert_many: ensure pre-insert hooks won’t be skipped elsewhere.Beanie’s insert_many can bypass events/validation; you’re compensating by calling _generate_fingerprint. Consider guarding it to only run when does_unites is True and add a test for bulk insert correctness.
- for state in documents: - state._generate_fingerprint() + for state in documents: + if state.does_unites: + state._generate_fingerprint()Would you like a quick test added for bulk fingerprint generation?
♻️ Duplicate comments (3)
state-manager/app/routes.py (1)
166-171: Redundant API key checks; dependency already enforces 401.These branches are unreachable if the dependency fails; remove to reduce noise. This repeats prior feedback.
- if api_key: - logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - else: - logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") + logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id)state-manager/tests/unit/test_routes.py (1)
731-736: These 401-path tests are artifacts of redundant API key branches.Once the route drops the manual if/else, keep 401 tests at dependency level (FastAPI integration tests) instead of calling the handler directly with None.
Also applies to: 773-779
state-manager/app/models/db/state.py (1)
71-79: Fix compound index order for the enqueue query (range last).Current order puts the range field first, which prevents optimal use of equality filters and will hurt the query that matches on namespace_name, status, node_name with an enqueue_after ≤ now. Use equality fields first, range last; also align the name with prior guidance.
- IndexModel( - [ - ("enqueue_after", 1), - ("status", 1), - ("namespace_name", 1), - ("node_name", 1), - ], - name="idx_enqueue_after" - ) + IndexModel( + [ + ("namespace_name", 1), + ("status", 1), + ("node_name", 1), + ("enqueue_after", 1), + ], + name="idx_enqueue_query" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (17)
python-sdk/exospherehost/__init__.py(1 hunks)python-sdk/exospherehost/_version.py(1 hunks)python-sdk/exospherehost/runtime.py(3 hunks)python-sdk/exospherehost/signals.py(1 hunks)python-sdk/tests/test_package_init.py(1 hunks)python-sdk/tests/test_signals_and_runtime_functions.py(1 hunks)state-manager/app/controller/enqueue_states.py(2 hunks)state-manager/app/controller/prune_signal.py(1 hunks)state-manager/app/controller/re_queue_after_signal.py(1 hunks)state-manager/app/models/db/state.py(3 hunks)state-manager/app/models/signal_models.py(1 hunks)state-manager/app/models/state_status_enum.py(1 hunks)state-manager/app/routes.py(2 hunks)state-manager/tests/unit/controller/test_prune_signal.py(1 hunks)state-manager/tests/unit/controller/test_re_queue_after_signal.py(1 hunks)state-manager/tests/unit/models/test_signal_models.py(1 hunks)state-manager/tests/unit/test_routes.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (11)
state-manager/app/controller/prune_signal.py (3)
state-manager/app/models/signal_models.py (2)
PruneRequestModel(10-11)SignalResponseModel(6-8)state-manager/app/models/db/state.py (1)
State(12-80)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
python-sdk/exospherehost/__init__.py (1)
python-sdk/exospherehost/signals.py (2)
PruneSingal(5-33)ReQueueAfterSingal(36-67)
state-manager/app/controller/re_queue_after_signal.py (3)
state-manager/app/models/signal_models.py (2)
ReEnqueueAfterRequestModel(13-14)SignalResponseModel(6-8)state-manager/app/models/db/state.py (1)
State(12-80)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
state-manager/app/models/signal_models.py (1)
state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
state-manager/tests/unit/controller/test_prune_signal.py (4)
state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)state-manager/app/models/signal_models.py (1)
PruneRequestModel(10-11)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)state-manager/tests/unit/controller/test_re_queue_after_signal.py (3)
mock_request_id(15-16)mock_namespace(19-20)mock_state_id(23-24)
python-sdk/exospherehost/runtime.py (2)
python-sdk/exospherehost/signals.py (4)
PruneSingal(5-33)ReQueueAfterSingal(36-67)send(19-33)send(50-67)state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)
state-manager/tests/unit/models/test_signal_models.py (1)
state-manager/app/models/signal_models.py (3)
PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)SignalResponseModel(6-8)
python-sdk/tests/test_signals_and_runtime_functions.py (2)
python-sdk/exospherehost/signals.py (2)
PruneSingal(5-33)ReQueueAfterSingal(36-67)python-sdk/exospherehost/runtime.py (11)
Runtime(46-461)_setup_default_logging(15-43)_get_prune_endpoint(164-168)_get_requeue_after_endpoint(170-174)_need_secrets(372-376)_get_secrets(294-313)_get_secrets_endpoint(158-162)_notify_executed(254-271)_get_executed_endpoint(140-144)_notify_errored(274-291)_get_errored_endpoint(146-150)
state-manager/app/routes.py (4)
state-manager/app/models/signal_models.py (3)
SignalResponseModel(6-8)PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)state-manager/app/controller/re_queue_after_signal.py (1)
re_queue_after_signal(12-30)state-manager/app/utils/check_secret.py (1)
check_api_key(11-16)
state-manager/tests/unit/controller/test_re_queue_after_signal.py (3)
state-manager/app/controller/re_queue_after_signal.py (1)
re_queue_after_signal(12-30)state-manager/app/models/signal_models.py (1)
ReEnqueueAfterRequestModel(13-14)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
state-manager/tests/unit/test_routes.py (3)
state-manager/app/models/signal_models.py (3)
PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)SignalResponseModel(6-8)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)state-manager/app/routes.py (4)
enqueue_state(63-73)trigger_graph_route(83-93)prune_state_route(163-172)re_enqueue_after_state_route(182-191)
🔇 Additional comments (18)
python-sdk/exospherehost/_version.py (1)
1-1: Ensure version is exposed and release notes are updated
- Verify
python-sdk/exospherehost/__init__.pyimportsversionfrom_version.pyand consistently exposes it via__version__/VERSION.- No changelog or docs entry found for
0.0.2b2; please add a release note or CHANGELOG entry.python-sdk/exospherehost/signals.py (1)
50-67: Enhance ReQueueAfter.send with timeouts, diagnostics, and correct payload
- Accept an optional ClientSession (with ClientTimeout) and only close it when owned to avoid unbounded requests.
- On non-200 responses, include the response body in the thrown exception for easier debugging.
- Fix the typo in the diff: use
self.timedelta(or rename the attribute consistently) instead ofself.delay.- Verify with the server contract whether the payload should remain
{"enqueue_after": …}in milliseconds or switch to ISO-8601 (e.g.next_retry_at).python-sdk/exospherehost/runtime.py (3)
164-175: Endpoint path inconsistency: singular “state” vs plural “states”
Unable to locate the server’s route definitions in this repo; please confirm whether the prune and re-enqueue endpoints usestates/{state_id}and update accordingly to avoid 404s:- return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/state/{state_id}/prune" + return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/states/{state_id}/prune" @@ - return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/state/{state_id}/re-enqueue-after" + return f"{self._state_manager_uri}/{self._state_manager_version}/namespace/{self._namespace}/states/{state_id}/re-enqueue-after"
11-11: No import change needed until classes are renamed. The current imports inruntime.pycorrectly reference the existing class names insignals.py(which still use the “Singal” typo). First rename insignals.py:-class PruneSingal(Exception): +class PruneSignal(Exception): -class ReQueueAfterSingal(Exception): +class RequeueAfterSignal(Exception):then update the import in
runtime.py:-from .signals import PruneSingal, ReQueueAfterSingal +from .signals import PruneSignal, RequeueAfterSignalLikely an incorrect or invalid review comment.
298-314: Secrets endpoint path matches server definition
Confirmed server route at state-manager/app/routes.py:252 uses/state/{state_id}/secrets, matching the client.python-sdk/exospherehost/__init__.py (1)
41-41: Align exported signals with issue #288 (Skip/TryAfter) — current code exports PruneSingal / ReQueueAfterSingal (typo: “Singal”).
- Findings (from repo): python-sdk/exospherehost/signals.py defines PruneSingal and ReQueueAfterSingal; init.py exports them; runtime.py imports/catches them; tests (python-sdk/tests/test_package_init.py, test_signals_and_runtime_functions.py) expect/use those names.
- Action: confirm intended API. Either
- Rename to the issue names (Skip, TryAfter) AND fix the “Singal” typo → update signals.py, exospherehost/init.py, runtime.py, all tests and public docs; or
- Keep Prune/ReQueue names and update/close issue #288 (and document the semantic mapping).
- Also confirm whether the misspelling “Singal” should be corrected to “Signal” in the public API.
state-manager/tests/unit/controller/test_re_queue_after_signal.py (2)
259-294: Verify allowed source statuses for re-enqueue; tests currently enforce CREATED for all.Spec in #288 implies TryAfter from an attempt should not introduce new public names and should respect attempt/backoff semantics. Resetting SUCCESS/EXECUTED/PRUNED back to CREATED may be invalid.
Confirm policy and adjust controller/tests accordingly (e.g., allow from {CREATED, ERRORED}, 409 otherwise).
58-73: Good coverage of happy path and field effects.Mocks, time patching, and assertions on state/save calls are solid.
state-manager/app/controller/re_queue_after_signal.py (2)
14-21: 404 handling is correct and logged.Lookup + 404 works as expected.
22-26: Allow negative delays, optionally extract now, verify status policy
- Retain current behavior permitting
body.enqueue_after < 0(tests at test_re_queue_after_signal.py:188–192 expect negative offsets).- For readability you may hoist
now_ms = int(time.time() * 1000)before settingstate.enqueue_after.- Confirm that unconditionally resetting
state.status = CREATEDfrom any prior status aligns with the intended TryAfter retry/backoff policy (#288) and idempotency requirements.state-manager/app/controller/prune_signal.py (1)
16-16: Scoped lookup by namespace is correct.Fetching by both id and namespace enforces tenant isolation. Good.
state-manager/tests/unit/test_routes.py (2)
694-717: Route-level tests look good.Mocks validate the handler-to-controller wiring and parameter shapes.
156-162: Route description should reflect “prune” or be removed.Tests should expect the corrected description (“State pruned successfully”) after the route fix.
state-manager/app/models/db/state.py (4)
21-21: Field addition looks good.Adding a general-purpose data dict on State is reasonable and non-breaking.
28-49: Fingerprint hook logic is fine.Partial unique index only applies when does_unites=True; setting empty fingerprint otherwise is safe.
60-69: Partial unique index on fingerprint is correct.Scope-limiting by does_unites avoids blocking non-union states. LGTM.
18-18: Verify no new public statuses are introduced.Per acceptance, no new public state names; Skip must record EXECUTED with metadata.skipped=true, and TryAfter must set ERRORED with next_retry_at. Please confirm StateStatusEnum hasn’t added PRUNED or other statuses.
python-sdk/tests/test_signals_and_runtime_functions.py (1)
116-173: Milliseconds conversion tests are solid.Good edge coverage for sub-second rounding.
state-manager/tests/unit/controller/test_re_queue_after_signal.py
Outdated
Show resolved
Hide resolved
- Renamed `PruneSingal` and `ReQueueAfterSingal` to `PruneSignal` and `ReQueueAfterSignal` across the codebase for consistency. - Updated exception handling in the `Runtime` class to reflect the new signal names. - Adjusted test cases to use the corrected signal names, ensuring comprehensive coverage for the updated functionality. - Enhanced model validation in `ReEnqueueAfterRequestModel` and `State` to enforce positive duration constraints for re-enqueueing operations.
- Updated expected exports in test_package_init.py to reflect the correct signal name `PruneSignal` and `ReQueueAfterSignal`. - Modified test_requeue_signal_with_zero_timedelta in test_signals_and_runtime_functions.py to raise an exception when initialized with zero timedelta, improving error handling for the ReQueueAfterSignal class. - Removed unnecessary assertions in the test to streamline the test logic.
- Updated tests for ReEnqueueAfterRequestModel to raise exceptions for zero and negative delay values, ensuring stricter validation. - Adjusted test cases in related signal tests to reflect the new validation rules, improving overall test reliability and coverage.
nk-ag
left a comment
There was a problem hiding this comment.
questions on implementation choices, spell checks and function consistencies
- Introduced a new documentation file for Signals, detailing their functionality, usage, and examples for `PruneSignal` and `ReQueueAfterSignal`. - Updated mkdocs.yml to include the new Signals documentation in the navigation structure, enhancing accessibility for users.
- Modified the status check in the prune_signal function to ensure that the state must be QUEUED before proceeding with pruning, enhancing the validation logic and error handling for state management operations.
There was a problem hiding this comment.
Actionable comments posted: 17
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
python-sdk/exospherehost/runtime.py (4)
221-247: Minor: log volume in enqueue loop.“Enqueued states: N” every poll can be noisy. Consider debug level or only when N>0.
183-211: f-string inside f-string is fine; consider precomputing list.Small readability nit; optional.
220-233: Add request timeout to state-manager calls.aiohttp defaults can hang; use ClientTimeout or a shared session with timeouts.
299-314: Secrets fetch: return type is Dict[str, str] but error paths return {}.OK, but log once at warn level to avoid log spam on repeated failures.
♻️ Duplicate comments (28)
state-manager/app/models/db/state.py (1)
70-79: Compound index key order is suboptimal for the enqueue query; put equality filters first, then the range.Scheduler queries typically filter by namespace_name, status, node_name and range on enqueue_after. Current index starts with enqueue_after, which prevents optimal use of the equality prefixes. This was flagged earlier; still applicable.
Apply:
- IndexModel( - [ - ("enqueue_after", 1), - ("status", 1), - ("namespace_name", 1), - ("node_name", 1), - ], - name="idx_enqueue_after" - ) + IndexModel( + [ + ("namespace_name", 1), + ("status", 1), + ("node_name", 1), + ("enqueue_after", 1), + ], + name="idx_enqueue_query" + )state-manager/tests/unit/controller/test_re_queue_after_signal.py (1)
165-183: Validate precise error types; prefer asserting HTTP 422 at the boundary.Model enforces gt=0; assert pydantic.ValidationError here, and add a route-level test that the API returns 422 for 0/negative delays. Docstring also contradicts behavior.
Apply:
- """Test re-enqueuing with negative delay (should still work)""" + """Test invalid delays are rejected by validation""" @@ - with pytest.raises(Exception): + from pydantic import ValidationError + with pytest.raises(ValidationError): ReEnqueueAfterRequestModel(enqueue_after=-5000) # Negative delay @@ - with pytest.raises(Exception): + with pytest.raises(ValidationError): ReEnqueueAfterRequestModel(enqueue_after=0)And consider adding a FastAPI route test to expect 422.
python-sdk/tests/test_package_init.py (1)
18-18: Update expected exports to spec-compliant names, or accept both for compatibility.Per #288, prefer SkipSignal/TryAfterSignal. If you retain legacy names, expand the expected set.
Apply (spec-first):
- expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "PruneSignal", "ReQueueAfterSignal"] + expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "SkipSignal", "TryAfterSignal"]Compatibility option:
- expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "PruneSignal", "ReQueueAfterSignal"] + expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "SkipSignal", "TryAfterSignal", "PruneSignal", "ReQueueAfterSignal"]python-sdk/exospherehost/runtime.py (2)
411-421: Signal handling can crash worker; also diverges from spec (must be attempt-scoped completion).
- If send() raises, the exception originates inside the except block and escapes this try/except, killing the worker. Wrap and degrade gracefully (log + _notify_errored).
- Spec from #288 requires POST /states/{id}/complete with a signal payload and idempotency by (state_id, attempt_id); bespoke /prune and /re-enqueue-after bypass that contract.
Minimal crash-proofing:
- except PruneSignal as prune_signal: + except PruneSignal as prune_signal: logger.info(f"Pruning state {state['state_id']} for node {node.__name__ if node else "unknown"}") - await prune_signal.send(self._get_prune_endpoint(state["state_id"]), self._key) # type: ignore - logger.info(f"Pruned state {state['state_id']} for node {node.__name__ if node else "unknown"}") + try: + await prune_signal.send(self._get_prune_endpoint(state["state_id"]), self._key) # type: ignore + logger.info(f"Pruned state {state['state_id']} for node {node.__name__ if node else "unknown"}") + except Exception as se: + logger.error(f"Prune signal failed for state {state['state_id']}: {se}") + await self._notify_errored(state["state_id"], f"prune_signal_failed: {se}") @@ - except ReQueueAfterSignal as requeue_signal: + except ReQueueAfterSignal as requeue_signal: logger.info(f"Requeuing state {state['state_id']} for node {node.__name__ if node else "unknown"} after {requeue_signal.delay}") - await requeue_signal.send(self._get_requeue_after_endpoint(state["state_id"]), self._key) # type: ignore - logger.info(f"Requeued state {state['state_id']} for node {node.__name__ if node else "unknown"} after {requeue_signal.delay}") + try: + await requeue_signal.send(self._get_requeue_after_endpoint(state["state_id"]), self._key) # type: ignore + logger.info(f"Requeued state {state['state_id']} for node {node.__name__ if node else "unknown"} after {requeue_signal.delay}") + except Exception as se: + logger.error(f"Requeue-after signal failed for state {state['state_id']}: {se}") + await self._notify_errored(state["state_id"], f"requeue_after_signal_failed: {se}")Direction (spec-aligned): introduce _get_complete_endpoint and _complete_with_signal(state_id, attempt_id, payload) and route Skip/TryAfter through it with idempotency. Happy to draft this if you confirm.
170-174: Name consistency: “requeue” vs “re-enqueue”.Earlier feedback asked for “requeue” consistently; route uses “re-enqueue-after”. Pick one and apply everywhere to avoid public API churn.
state-manager/app/models/signal_models.py (2)
10-11: Clarify prune payload semantics.-class PruneRequestModel(BaseModel): - data: dict[str, Any] = Field(..., description="Data of the state") +class PruneRequestModel(BaseModel): + data: dict[str, Any] = Field(..., description="Prune metadata (e.g., reason, context)")
6-14: Diverges from #288: signals should be Skip/TryAfter via completion, no new statuses.Current models cement PRUNED and separate signal routes. Spec requires discriminated union on completion with attempt-scoped idempotency. Consider reshaping models accordingly.
docs/docs/exosphere/signals.md (1)
78-81: Docs contradict #288 acceptance (no new statuses).Spec: Skip recorded as EXECUTED with metadata.skipped=true; TryAfter recorded as ERRORED with next_retry_at. Doc says PRUNED/CREATED. Align examples and lifecycle text.
state-manager/app/routes.py (2)
156-173: Prune route: redundant API key check, missing strong typing, spec divergence.
- check_api_key dependency already 401s; remove if/else branch.
- Let FastAPI validate state_id as PydanticObjectId.
- Public route diverges from #288 (signals via completion + idempotent).
-async def prune_state_route(namespace_name: str, state_id: str, body: PruneRequestModel, request: Request, api_key: str = Depends(check_api_key)): +async def prune_state_route(namespace_name: str, state_id: PydanticObjectId, body: PruneRequestModel, request: Request, api_key: str = Depends(check_api_key)): @@ - if api_key: - logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - else: - logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") - - return await prune_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id) + logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) + return await prune_signal(namespace_name, state_id, body, x_exosphere_request_id)
175-191: Re-enqueue route: same issues as prune; also naming drift (“re-enqueue” vs “requeue”).Apply the same API key/state_id fixes; consider unified naming.
-async def re_enqueue_after_state_route(namespace_name: str, state_id: str, body: ReEnqueueAfterRequestModel, request: Request, api_key: str = Depends(check_api_key)): +async def re_enqueue_after_state_route(namespace_name: str, state_id: PydanticObjectId, body: ReEnqueueAfterRequestModel, request: Request, api_key: str = Depends(check_api_key)): @@ - if api_key: - logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - else: - logger.error(f"API key is invalid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") - - return await re_queue_after_signal(namespace_name, PydanticObjectId(state_id), body, x_exosphere_request_id) + logger.info(f"API key is valid for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id) + return await re_queue_after_signal(namespace_name, state_id, body, x_exosphere_request_id)python-sdk/exospherehost/signals.py (5)
15-17: Mutable default argument; use None and copy.- def __init__(self, data: dict[str, Any] = {}): - self.data = data + def __init__(self, data: dict[str, Any] | None = None): + self.data = dict(data) if data is not None else {}
30-34: Network call lacks timeout, session reuse, and diagnostics.-from aiohttp import ClientSession +from aiohttp import ClientSession, ClientTimeout @@ - async def send(self, endpoint: str, key: str): + async def send(self, endpoint: str, key: str, session: ClientSession | None = None): @@ - async with ClientSession() as session: - async with session.post(endpoint, json=self.data, headers={"x-api-key": key}) as response: - if response.status != 200: - raise Exception(f"Failed to send prune signal to {endpoint}") + _owns = False + if session is None: + session = ClientSession(timeout=ClientTimeout(total=10)) + _owns = True + try: + async with session.post(endpoint, json=self.data, headers={"x-api-key": key}) as response: + if response.status != 200: + text = await response.text() + raise Exception(f"PruneSignal POST {endpoint} failed: {response.status} {text}") + finally: + if _owns: + await session.close()
65-71: Same networking concerns for ReQueueAfterSignal.send().Mirror timeout/session reuse and better error text.
- async with ClientSession() as session: - async with session.post(endpoint, json=body, headers={"x-api-key": key}) as response: - if response.status != 200: - raise Exception(f"Failed to send requeue after signal to {endpoint}") + from aiohttp import ClientTimeout + _owns = False + if 'session' not in locals() or session is None: + session = ClientSession(timeout=ClientTimeout(total=10)) + _owns = True + try: + async with session.post(endpoint, json=body, headers={"x-api-key": key}) as response: + if response.status != 200: + text = await response.text() + raise Exception(f"ReQueueAfter POST {endpoint} failed: {response.status} {text}") + finally: + if _owns: + await session.close()
5-14: SDK surface diverges from #288 (“Skip/TryAfter” via completion; include attempt_id).Direction:
- Provide SkipSignal(reason, outputs?) and TryAfterSignal(delay_s?, retry_at?, reason?).
- send() should target /states/{id}/complete with payload {attempt_id, signal:{...}} for idempotency.
- Include reason in logs/metrics.
Also applies to: 36-45
49-53: Wrong variable in message and exception type; validate delay properly.Use ValueError and interpolate the actual delay, not the timedelta type.
- if self.delay.total_seconds() <= 0: - raise Exception("Delay must be greater than 0") + if self.delay.total_seconds() <= 0: + raise ValueError("delay must be greater than 0 seconds") @@ - super().__init__(f"ReQueueAfter signal received with timedelta: {timedelta} \n NOTE: Do not catch this Exception, let it bubble up to Runtime for handling at StateManager") + super().__init__(f"ReQueueAfter signal received with delay: {self.delay} \n NOTE: Do not catch this Exception, let it bubble up to Runtime for handling at StateManager")state-manager/tests/unit/test_routes.py (3)
32-33: Assert exact route path membership (avoid substring false positives).Use direct membership checks to prevent masking typos.
- assert any('/v0/namespace/{namespace_name}/states/{state_id}/prune' in path for path in paths) - assert any('/v0/namespace/{namespace_name}/states/{state_id}/re-enqueue-after' in path for path in paths) + assert f'/v0/namespace/{{namespace_name}}/states/{{state_id}}/prune' in paths + assert f'/v0/namespace/{{namespace_name}}/states/{{state_id}}/re-enqueue-after' in paths
780-813: Fix mock assertion inside loop; assert each call or reset the mock. Also avoid PRUNED.assert_called_with inside a loop only checks the last invocation; either reset per-iteration or assert the call list. And avoid PRUNED.
- expected_response = SignalResponseModel( - status=StateStatusEnum.PRUNED, - enqueue_after=1234567890 - ) + expected_response = SignalResponseModel( + status=StateStatusEnum.EXECUTED, + enqueue_after=1234567890 + ) @@ - mock_prune_signal.assert_called_with("test_namespace", PydanticObjectId(state_id), prune_request, "test-request-id") + mock_prune_signal.assert_called_with("test_namespace", PydanticObjectId(state_id), prune_request, "test-request-id") + mock_prune_signal.reset_mock()Or collect expected_calls and use assert_has_calls.
814-844: Fix mock assertion inside loop for re-enqueue; validate every invocation.Reset the mock each iteration or assert the full call history.
- mock_re_queue_after_signal.assert_called_with("test_namespace", PydanticObjectId(state_id), re_enqueue_request, "test-request-id") + mock_re_queue_after_signal.assert_called_with("test_namespace", PydanticObjectId(state_id), re_enqueue_request, "test-request-id") + mock_re_queue_after_signal.reset_mock()state-manager/tests/unit/models/test_signal_models.py (2)
114-131: Use strict integer semantics; don’t allow silent coercion from str/float.Recommend conint(strict=True) in models; update tests to expect ValidationError for "5000" and 5000.0.
- delay = "5000" - model = ReEnqueueAfterRequestModel(enqueue_after=delay) # type: ignore - assert model.enqueue_after == 5000 + from pydantic import ValidationError + with pytest.raises(ValidationError): + ReEnqueueAfterRequestModel(enqueue_after="5000") # type: ignore @@ - delay = 5000.0 - model = ReEnqueueAfterRequestModel(enqueue_after=delay) # type: ignore - assert model.enqueue_after == 5000 + from pydantic import ValidationError + with pytest.raises(ValidationError): + ReEnqueueAfterRequestModel(enqueue_after=5000.0) # type: ignoreModel (outside this file):
- enqueue_after: int = Field(..., gt=0, description="Duration in milliseconds to delay the re-enqueuing of the state") + enqueue_after: conint(strict=True, gt=0) = Field(..., description="Delay (ms) before re-enqueue")
223-236: Limit enum assertions to statuses reachable via signals; remove internal-only or new values.Acceptance: no new public statuses; don’t assert PRUNED/NEXT_CREATED_ERROR here.
- all_statuses = [ - StateStatusEnum.CREATED, - StateStatusEnum.QUEUED, - StateStatusEnum.EXECUTED, - StateStatusEnum.ERRORED, - StateStatusEnum.CANCELLED, - StateStatusEnum.SUCCESS, - StateStatusEnum.NEXT_CREATED_ERROR, - StateStatusEnum.PRUNED - ] + all_statuses = [ + StateStatusEnum.EXECUTED, # Skip + StateStatusEnum.ERRORED, # TryAfter schedules retry + StateStatusEnum.CREATED, # TryAfter may requeue as created + ]python-sdk/tests/test_signals_and_runtime_functions.py (8)
201-209: Use plural path segment “states”.Align with existing executed/errored endpoints.
- expected_prune = "http://test-state-manager/v0/namespace/test-namespace/state/test-state-id/prune" + expected_prune = "http://test-state-manager/v0/namespace/test-namespace/states/test-state-id/prune" @@ - expected_requeue = "http://test-state-manager/v0/namespace/test-namespace/state/test-state-id/re-enqueue-after" + expected_requeue = "http://test-state-manager/v0/namespace/test-namespace/states/test-state-id/re-enqueue-after"
361-388: Normalize endpoints to “/states/…”.Keep endpoint builders consistent.
- expected = "http://test-state-manager/v0/namespace/test-namespace/state/state-123/prune" + expected = "http://test-state-manager/v0/namespace/test-namespace/states/state-123/prune" @@ - expected = "http://test-state-manager/v0/namespace/test-namespace/state/state-456/re-enqueue-after" + expected = "http://test-state-manager/v0/namespace/test-namespace/states/state-456/re-enqueue-after"
389-417: Constructor arg typo: state_manage_version → state_manager_version.Fix tests and Runtime.init to avoid silent misconfigurations.
- state_manage_version="v1" + state_manager_version="v1" @@ - state_manage_version="v2" + state_manager_version="v2"Companion change (runtime.py):
- def __init__(..., state_manage_version: str = "v0", ...): + def __init__(..., state_manager_version: str = "v0", ...): @@ - self._state_manager_version = state_manage_version + self._state_manager_version = state_manager_version
671-690: Fix false-positive logging assertion; patch logging.basicConfig.Asserting on mock_root_logger.basicConfig doesn’t verify anything. Patch logging.basicConfig and assert not called.
- with patch('logging.getLogger') as mock_get_logger: + with patch('logging.getLogger') as mock_get_logger, \ + patch('logging.basicConfig') as mock_basic_config: @@ - # This should return early and not configure logging - _setup_default_logging() - - # Verify no basic config was called - mock_root_logger.basicConfig = MagicMock() - assert not mock_root_logger.basicConfig.called + _setup_default_logging() + mock_basic_config.assert_not_called()
310-313: Secrets path should also use plural “states”.Keep consistency across runtime endpoints.
- runtime._get_secrets_endpoint("test-state-id"), + runtime._get_secrets_endpoint("test-state-id"),Companion change (runtime.py):
- return f".../namespace/{self._namespace}/state/{state_id}/secrets" + return f".../namespace/{self._namespace}/states/{state_id}/secrets"
456-466: Production-like endpoints should use plural “states”.Keep parity across all expectations.
- expected_prune_endpoint = "https://api.exosphere.host/v1/namespace/production/state/prod-state-456/prune" + expected_prune_endpoint = "https://api.exosphere.host/v1/namespace/production/states/prod-state-456/prune" @@ - expected_requeue_endpoint = "https://api.exosphere.host/v1/namespace/production/state/prod-state-789/re-enqueue-after" + expected_requeue_endpoint = "https://api.exosphere.host/v1/namespace/production/states/prod-state-789/re-enqueue-after"
492-501: Pluralize “state” → “states” in matrixed endpoint tests.- expected_prune = f"{uri}/{version}/namespace/{namespace}/state/test-state/prune" + expected_prune = f"{uri}/{version}/namespace/{namespace}/states/test-state/prune" @@ - expected_requeue = f"{uri}/{version}/namespace/{namespace}/state/test-state/re-enqueue-after" + expected_requeue = f"{uri}/{version}/namespace/{namespace}/states/test-state/re-enqueue-after"
201-209: Align with single completion endpoint and attempt-scoped idempotency.SDK should POST to /states/{id}/complete with {"attempt_id": "", "signal": {...}}. Update endpoint builders and tests to assert attempt_id presence.
- prune_endpoint = runtime._get_prune_endpoint("test-state-id") - expected_prune = "http://test-state-manager/v0/namespace/test-namespace/states/test-state-id/prune" + complete_endpoint = runtime._get_complete_endpoint("test-state-id") + expected_complete = "http://test-state-manager/v0/namespace/test-namespace/states/test-state-id/complete" - assert prune_endpoint == expected_prune + assert complete_endpoint == expected_completeAnd when sending:
- await signal.send(endpoint, key) + await runtime.send_completion( + state_id, attempt_id=fixed_uuid, + signal={"type":"SKIP","reason":"...","outputs":[...]} + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (13)
docs/docs/exosphere/signals.md(1 hunks)docs/mkdocs.yml(2 hunks)python-sdk/exospherehost/__init__.py(1 hunks)python-sdk/exospherehost/runtime.py(3 hunks)python-sdk/exospherehost/signals.py(1 hunks)python-sdk/tests/test_package_init.py(1 hunks)python-sdk/tests/test_signals_and_runtime_functions.py(1 hunks)state-manager/app/models/db/state.py(3 hunks)state-manager/app/models/signal_models.py(1 hunks)state-manager/app/routes.py(2 hunks)state-manager/tests/unit/controller/test_re_queue_after_signal.py(1 hunks)state-manager/tests/unit/models/test_signal_models.py(1 hunks)state-manager/tests/unit/test_routes.py(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (8)
state-manager/tests/unit/models/test_signal_models.py (2)
state-manager/app/models/signal_models.py (3)
PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)SignalResponseModel(6-8)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
python-sdk/exospherehost/__init__.py (1)
python-sdk/exospherehost/signals.py (2)
PruneSignal(5-33)ReQueueAfterSignal(36-71)
state-manager/app/routes.py (4)
state-manager/app/models/signal_models.py (3)
SignalResponseModel(6-8)PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)state-manager/app/controller/re_queue_after_signal.py (1)
re_queue_after_signal(12-30)state-manager/app/utils/check_secret.py (1)
check_api_key(11-16)
state-manager/app/models/signal_models.py (1)
state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
state-manager/tests/unit/test_routes.py (3)
state-manager/app/models/signal_models.py (3)
PruneRequestModel(10-11)ReEnqueueAfterRequestModel(13-14)SignalResponseModel(6-8)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)state-manager/app/routes.py (4)
enqueue_state(63-73)trigger_graph_route(83-93)prune_state_route(163-172)re_enqueue_after_state_route(182-191)
state-manager/tests/unit/controller/test_re_queue_after_signal.py (3)
state-manager/app/controller/re_queue_after_signal.py (1)
re_queue_after_signal(12-30)state-manager/app/models/signal_models.py (1)
ReEnqueueAfterRequestModel(13-14)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
python-sdk/tests/test_signals_and_runtime_functions.py (2)
python-sdk/exospherehost/signals.py (4)
PruneSignal(5-33)ReQueueAfterSignal(36-71)send(19-33)send(54-71)python-sdk/exospherehost/runtime.py (11)
Runtime(46-461)_setup_default_logging(15-43)_get_prune_endpoint(164-168)_get_requeue_after_endpoint(170-174)_need_secrets(372-376)_get_secrets(294-313)_get_secrets_endpoint(158-162)_notify_executed(254-271)_get_executed_endpoint(140-144)_notify_errored(274-291)_get_errored_endpoint(146-150)
python-sdk/exospherehost/runtime.py (2)
python-sdk/exospherehost/signals.py (4)
PruneSignal(5-33)ReQueueAfterSignal(36-71)send(19-33)send(54-71)state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)
🪛 LanguageTool
docs/docs/exosphere/signals.md
[grammar] ~1-~1: Use correct spacing
Context: # Signals !!! beta "Beta Feature" Signals are ...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[style] ~2-~2: Using many exclamation marks might seem excessive (in this case: 3 exclamation marks for a text that’s 1991 characters long)
Context: # Signals !!! beta "Beta Feature" Signals are cur...
(EN_EXCESSIVE_EXCLAMATION)
[grammar] ~4-~4: Use correct spacing
Context: ...tionality may change in future releases. Signals are a mechanism in Exosphere for...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~6-~6: Use correct spacing
Context: ... states or requeuing them after a delay. ## Overview Signals are implemented as exc...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~8-~8: Use correct spacing
Context: ...queuing them after a delay. ## Overview Signals are implemented as exceptions th...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~10-~10: Use correct spacing
Context: ...manager to perform the requested action. ## Available Signals ### PruneSignal The ...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~12-~12: Use correct spacing
Context: ... requested action. ## Available Signals ### PruneSignal The PruneSignal is used t...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~14-~14: Use correct spacing
Context: .... ## Available Signals ### PruneSignal The PruneSignal is used to permanently...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~16-~16: Use correct spacing
Context: ...ent execution path should be terminated. #### Usage ```python from exospherehost impo...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~18-~18: Use correct spacing
Context: ...n path should be terminated. #### Usage python from exospherehost import PruneSignal class MyNode(BaseNode): class Inputs(BaseModel): data: str class Outputs(BaseModel): result: str async def execute(self, inputs: Inputs) -> Outputs: if inputs.data == "invalid": # Prune the state with optional data raise PruneSignal({"reason": "invalid_data", "error": "Data validation failed"}) return self.Outputs(result="processed") #### Parameters - data (dict[str, Any], op...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~38-~38: Use correct spacing
Context: ...result="processed") ``` #### Parameters - data (dict[str, Any], optional): Additional ...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~40-~40: Use correct spacing
Context: ...ration. Defaults to an empty dictionary. ### ReQueueAfterSignal The `ReQueueAfterSig...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~42-~42: Use correct spacing
Context: ...mpty dictionary. ### ReQueueAfterSignal The ReQueueAfterSignal is used to requ...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~44-~44: Use correct spacing
Context: ...ogic, scheduled tasks, or rate limiting. #### Usage ```python from exospherehost impo...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~46-~46: Use correct spacing
Context: ...led tasks, or rate limiting. #### Usage python from exospherehost import ReQueueAfterSignal from datetime import timedelta class RetryNode(BaseNode): class Inputs(BaseModel): retry_count: int data: str class Outputs(BaseModel): result: str async def execute(self, inputs: Inputs) -> Outputs: if inputs.retry_count < 3: # Requeue after 5 minutes raise ReQueueAfterSignal(timedelta(minutes=5)) return self.Outputs(result="completed") #### Parameters - delay (timedelta): The a...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~68-~68: Use correct spacing
Context: ...result="completed") ``` #### Parameters - delay (timedelta): The amount of time to wait...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~70-~70: Use correct spacing
Context: ...euing the state. Must be greater than 0. ## Important Notes 1. **Do not catch signa...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~72-~72: Use correct spacing
Context: ...t be greater than 0. ## Important Notes 1. Do not catch signals: Signals are desi...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~74-~74: Use correct spacing
Context: ...atch these exceptions in your node code. 2. Automatic handling: The runtime automa...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~76-~76: Use correct spacing
Context: ... the state manager when they are raised. 3. State lifecycle: Signals affect the st...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~78-~78: There might be a mistake here.
Context: ... state's lifecycle in the state manager: - PruneSignal: Sets state status to PRUNED - `Re...
(QB_NEW_EN)
[grammar] ~79-~79: There might be a mistake here.
Context: ... - PruneSignal: Sets state status to PRUNED - ReQueueAfterSignal: Sets state status to CREATED and sch...
(QB_NEW_EN)
[grammar] ~80-~80: Use correct spacing
Context: ...tatus to CREATED and schedules requeue ## Error Handling If signal sending fails ...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~82-~82: Use correct spacing
Context: ...and schedules requeue ## Error Handling If signal sending fails (e.g., network i...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~84-~84: Use correct spacing
Context: ...ignal will not be retried automatically. ## Examples ### Conditional Pruning ```py...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~86-~86: Use correct spacing
Context: ...t be retried automatically. ## Examples ### Conditional Pruning ```python class Val...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~88-~88: Use correct spacing
Context: ...y. ## Examples ### Conditional Pruning python class ValidationNode(BaseNode): class Inputs(BaseModel): user_id: str data: dict async def execute(self, inputs: Inputs) -> Outputs: if not self._validate_user(inputs.user_id): raise PruneSignal({ "reason": "invalid_user", "user_id": inputs.user_id, "timestamp": datetime.now().isoformat() }) return self.Outputs(validated=True) ### Polling ```python class PollingNode(Bas...
(QB_NEW_EN_OTHER_ERROR_IDS_5)
[grammar] ~107-~107: Use correct spacing
Context: ...Outputs(validated=True) ### Polling python class PollingNode(BaseNode): class Inputs(BaseModel): job_id: str async def execute(self, inputs: Inputs) -> Outputs: # Check if the job is complete job_status = await self._check_job_status(inputs.job_id) if job_status == "completed": result = await self._get_job_result(inputs.job_id) return self.Outputs(result=result) elif job_status == "failed": # Job failed, prune the state raise PruneSignal({ "reason": "job_failed", "job_id": inputs.job_id, "poll_count": inputs.poll_count }) else: # Job still running, poll again in 30 seconds raise ReQueueAfterSignal(timedelta(seconds=30)) ```
(QB_NEW_EN_OTHER_ERROR_IDS_5)
🪛 markdownlint-cli2 (0.17.2)
docs/docs/exosphere/signals.md
131-131: Trailing spaces
Expected: 0 or 2; Actual: 1
(MD009, no-trailing-spaces)
131-131: Files should end with a single newline character
(MD047, single-trailing-newline)
🔇 Additional comments (8)
state-manager/app/models/db/state.py (1)
26-26: Clarifyenqueue_afternaming to avoid semantic clash
- In
State(app/models/db/state.py), renameenqueue_aftertonext_enqueue_at_ms(absolute epoch ms).- In
ReEnqueueAfterRequestModel(app/models/signal_models.py), renameenqueue_aftertodelay_ms(duration in ms) and in the controller (app/controller/re_queue_after_signal.py) computenext_enqueue_at_ms = current_time_ms + delay_ms.
I can draft the migration, API change and test updates once you confirm these names.state-manager/tests/unit/controller/test_re_queue_after_signal.py (1)
296-313: LGTM: time precision coverage.Covers fractional seconds conversion to ms correctly.
python-sdk/exospherehost/__init__.py (1)
41-45: Export SkipSignal and TryAfterSignal in the public APIUpdate python-sdk/exospherehost/init.py to alias the existing signals and expose both spec-aligned and (optionally) legacy names:
-from .signals import PruneSignal, ReQueueAfterSignal +from .signals import PruneSignal as SkipSignal, \ + ReQueueAfterSignal as TryAfterSignal-__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", - "VERSION", "PruneSignal", "ReQueueAfterSignal"] +__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", + "VERSION", + "SkipSignal", "TryAfterSignal", + "PruneSignal", "ReQueueAfterSignal"]Ensure these names match the external spec and update any dependent tests or docs to accept the superset of aliases.
state-manager/tests/unit/test_routes.py (1)
114-144: Model validation tests look solid.Covers happy-path, empty, and complex payloads.
state-manager/tests/unit/models/test_signal_models.py (2)
1-63: Good coverage of PruneRequestModel.Validates required field, None/missing cases.
247-259: Scopeenqueue_afterrename across entire codebase
Renaming this field is a breaking change—verify and update the Pydantic models (app/models/signal_models.py, app/models/db/state.py), controllers (prune_signal.py, re_queue_after_signal.py), all tests (state-manager/tests/, python-sdk/tests/), routes, and the python-sdk client before proceeding.python-sdk/tests/test_signals_and_runtime_functions.py (2)
1-43: Test scaffolding for aiohttp looks good.Async context managers and responses are mocked correctly.
116-173: Ignore attempt_id/signal wrapper suggestion Signal payloads correctly match state-manager models (datafor prune,enqueue_afterfor re-enqueue) and there is no nestedsignalobject orattempt_idfield in the API spec.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (5)
state-manager/app/controller/prune_signal.py (3)
24-28: Do not introduce PRUNED; persist Skip as EXECUTED (per #288).Replace PRUNED with EXECUTED to avoid adding a new public status. Response should mirror the persisted status.
- state.status = StateStatusEnum.PRUNED + state.status = StateStatusEnum.EXECUTED state.data = body.data await state.save() - return SignalResponseModel(status=StateStatusEnum.PRUNED, enqueue_after=state.enqueue_after) + return SignalResponseModel(status=StateStatusEnum.EXECUTED, enqueue_after=state.enqueue_after)Follow-up (outside this hunk): add a metadata field to State and set metadata.skipped=True, plus optional reason/outputs on the request model.
16-27: Make the transition atomic and idempotent-ready.Current read-modify-write can race. Use a conditional update scoped by id, namespace, and current status; then read for enqueue_after. Prepare for attempt-scoped idempotency later.
- state = await State.find_one(State.id == state_id, State.namespace_name == namespace_name) - - if not state: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found") - - if state.status != StateStatusEnum.QUEUED: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="State is not queued") - - state.status = StateStatusEnum.PRUNED - state.data = body.data - await state.save() + # Atomic state transition + result = await State.find_one( + State.id == state_id, + State.namespace_name == namespace_name, + State.status == StateStatusEnum.QUEUED, + ).update( + { + "$set": { + "status": StateStatusEnum.EXECUTED, + "data": body.data, + } + } + ) + if not result or getattr(result, "matched_count", 0) == 0: + # Not found or precondition failed + # 404 keeps parity with current behavior; consider 409 Conflict if the doc exists but status != QUEUED + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="State not found or not in QUEUED") + + # Fetch updated doc for response fields + state = await State.find_one(State.id == state_id, State.namespace_name == namespace_name) + assert state is not NoneNext step (outside this file): include attempt_id in the request model and upsert an attempt-scoped write to guarantee idempotency.
30-32: Log exceptions with traceback; keep request id structured.Use logger.exception to preserve stack traces.
- except Exception as e: - logger.error(f"Error pruning state {state_id} for namespace {namespace_name}", x_exosphere_request_id=x_exosphere_request_id, error=e) - raise + except Exception: + logger.exception( + f"Error pruning state {state_id} for namespace {namespace_name}", + x_exosphere_request_id=x_exosphere_request_id, + ) + raisestate-manager/tests/unit/controller/test_prune_signal.py (2)
63-70: Align expectations: status must be EXECUTED (no PRUNED).Update assertions and verify the query is scoped by id and namespace.
- assert result.status == StateStatusEnum.PRUNED + assert result.status == StateStatusEnum.EXECUTED assert result.enqueue_after == 1234567890 - assert mock_state_created.status == StateStatusEnum.PRUNED + assert mock_state_created.status == StateStatusEnum.EXECUTED assert mock_state_created.data == mock_prune_request.data assert mock_state_created.save.call_count == 1 assert mock_state_class.find_one.call_count == 1 + # Ensure both filters are present in the lookup + args, _ = mock_state_class.find_one.call_args + assert any("id" in str(a) for a in args) + assert any("namespace_name" in str(a) for a in args)
96-176: Consolidate invalid-status cases via parametrization; fix misleading docstrings.Four near-identical tests can be one parametrized test; also docstrings say QUEUED but set CREATED.
- @patch('app.controller.prune_signal.State') - async def test_prune_signal_invalid_status_created(... - ... - assert exc_info.value.detail == "State is not queued" - ... - @patch('app.controller.prune_signal.State') - async def test_prune_signal_invalid_status_pruned(... - ... - assert exc_info.value.detail == "State is not queued" + @pytest.mark.parametrize("bad_status", [ + StateStatusEnum.CREATED, + StateStatusEnum.EXECUTED, + StateStatusEnum.ERRORED, + StateStatusEnum.PRUNED, + ]) + @patch('app.controller.prune_signal.State') + async def test_prune_signal_invalid_status(self, mock_state_class, mock_namespace, mock_state_id, mock_prune_request, mock_request_id, bad_status): + """State must be QUEUED; other statuses are invalid for pruning.""" + mock_state = MagicMock() + mock_state.status = bad_status + mock_state_class.find_one = AsyncMock(return_value=mock_state) + with pytest.raises(HTTPException) as exc_info: + await prune_signal(mock_namespace, mock_state_id, mock_prune_request, mock_request_id) + assert exc_info.value.status_code == status.HTTP_400_BAD_REQUEST + assert "QUEUED" in exc_info.value.detailAlso applies to: 177-203
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
state-manager/app/controller/prune_signal.py(1 hunks)state-manager/tests/unit/controller/test_prune_signal.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
state-manager/app/controller/prune_signal.py (3)
state-manager/app/models/signal_models.py (2)
PruneRequestModel(10-11)SignalResponseModel(6-8)state-manager/app/models/db/state.py (1)
State(12-80)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
state-manager/tests/unit/controller/test_prune_signal.py (3)
state-manager/app/controller/prune_signal.py (1)
prune_signal(11-32)state-manager/app/models/signal_models.py (1)
PruneRequestModel(10-11)state-manager/app/models/state_status_enum.py (1)
StateStatusEnum(4-13)
🔇 Additional comments (2)
state-manager/app/controller/prune_signal.py (1)
16-16: Good: query is correctly scoped by id and namespace.state-manager/tests/unit/controller/test_prune_signal.py (1)
316-319: Update expected status to EXECUTED.- assert result.status == StateStatusEnum.PRUNED + assert result.status == StateStatusEnum.EXECUTEDLikely an incorrect or invalid review comment.
Pending items:
Closes #288