-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Add conversation variable persistence layer #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: coderabbit_combined_20260121_augment_sentry_coderabbit_1_base_feat_add_conversation_variable_persistence_layer__pr429
Are you sure you want to change the base?
Conversation
… factory to pass the ConversationVariableUpdater factory (the only non-VariablePool dependency), plus a unit test to verify the injection path. - `api/core/workflow/nodes/variable_assigner/v2/node.py` adds a kw-only `conv_var_updater_factory` dependency (defaulting to `conversation_variable_updater_factory`) and stores it for use in `_run`. - `api/core/workflow/nodes/node_factory.py` now injects the factory when creating VariableAssigner v2 nodes. - `api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py` adds a test asserting the factory is injected. Tests not run. Next steps (optional): 1) `make lint` 2) `make type-check` 3) `uv run --project api --dev dev/pytest/pytest_unit_tests.sh`
…ructor args. - `api/core/workflow/nodes/node_factory.py` now directly instantiates `VariableAssignerNode` with the injected dependency, and uses a direct call for all other nodes. No tests run.
Add a new command for GraphEngine to update a group of variables. This command takes a group of variable selectors and new values. When the engine receives the command, it will update the corresponding variable in the variable pool. If it does not exist, it will add it; if it does, it will overwrite it. Both behaviors should be treated the same and do not need to be distinguished.
…be-kanban 0941477f) Create a new persistence layer for the Graph Engine. This layer receives a ConversationVariableUpdater upon initialization, which is used to persist the received ConversationVariables to the database. It can retrieve the currently processing ConversationId from the engine's variable pool. It captures the successful execution event of each node and determines whether the type of this node is VariableAssigner(v1 and v2). If so, it retrieves the variable name and value that need to be updated from the node's outputs. This layer is only used in the Advanced Chat. It should be placed outside of Core.Workflow package.
…rs/conversation_variable_persist_layer.py` to satisfy SIM118 - chore(lint): run `make lint` (passes; warnings about missing RECORD during venv package uninstall) - chore(type-check): run `make type-check` (fails: 1275 errors for missing type stubs like `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`)
…tType validation and casting - test(graph-engine): update VariableUpdate usages to include value_type in command tests
… drop common_helpers usage - refactor(variable-assigner-v2): inline updated variable payload and drop common_helpers usage Tests not run.
…n and remove value type validation - test(graph-engine): update UpdateVariablesCommand tests to pass concrete Variable instances - fix(graph-engine): align VariableUpdate values with selector before adding to VariablePool Tests not run.
…e handling for v1/v2 process_data - refactor(app-layer): read updated variables from process_data in conversation variable persistence layer - test(app-layer): adapt persistence layer tests to use common_helpers updated-variable payloads Tests not run.
…nce reads from process_data
…fter venv changes) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs across dependencies) Details: - `make lint` fails with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…ableUnion and remove value type validation" This reverts commit 5ebc87a.
…h SegmentType validation and casting" This reverts commit 3edd525.
This reverts commit 67007f6.
…y out of core.workflow into `api/services/conversation_variable_updater.py` - refactor(app): update advanced chat app runner and conversation service to import the new updater factory Tests not run.
…-linter module missing) - chore(type-check): run `make type-check` (fails: 1275 missing type stubs) Details: - `make lint` reports: `No matches for ignored import core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database` and ends with `ModuleNotFoundError: No module named 'dotenv_linter.cli'`. - `make type-check` fails with missing type stubs for `opentelemetry`, `click`, `sqlalchemy`, `flask`, `pydantic`, `pydantic_settings`, etc.
…impl import in `api/.importlinter`
WalkthroughThis pull request refactors conversation variable persistence by migrating the responsibility from VariableAssignerNode to a new ConversationVariablePersistenceLayer. It also refactors the ReadOnlyVariablePool API to accept a Sequence-based selector instead of separate node_id and variable_key parameters. Updates include removals of ConversationVariableUpdater integration from variable assigner nodes, service module reorganization, and comprehensive test coverage. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@api/services/conversation_variable_updater.py`:
- Around line 14-23: The update method creates a Session(db.engine) without
cleanup; change it to use a context manager so the session is always
closed/rolled back on exceptions: wrap the body of update in "with
Session(db.engine) as session:" and perform the select/row lookup, assignment to
row.data (using ConversationVariable and variable.model_dump_json()), and
session.commit() inside that block so ConversationVariableNotFoundError is still
raised when no row is found and resources are cleaned up automatically.
🧹 Nitpick comments (3)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (1)
106-111: Event-driven verification pattern looks good.The test correctly transitions from direct updater mocking to event-based verification, aligning with the new architecture where
ConversationVariablePersistenceLayerhandles persistence.One consideration: using
next()without a default will raiseStopIterationif noNodeRunSucceededEventis found. This is acceptable for tests since a missing event indicates a test failure, but you could make the assertion more explicit:♻️ Optional: More explicit event assertion
events = list(node.run()) - succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) + succeeded_events = [event for event in events if isinstance(event, NodeRunSucceededEvent)] + assert len(succeeded_events) == 1, f"Expected exactly one NodeRunSucceededEvent, got {len(succeeded_events)}" + succeeded_event = succeeded_events[0]api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (2)
54-54: Usedatetime.now(timezone.utc)instead of deprecateddatetime.utcnow().
datetime.utcnow()is deprecated since Python 3.12. While this is test code, it's good practice to use the modern API.♻️ Suggested fix
+from datetime import datetime, timezone -from datetime import datetime ... - start_at=datetime.utcnow(), + start_at=datetime.now(timezone.utc),
18-31: Consider extractingMockReadOnlyVariablePoolto a shared test utility.This mock class is duplicated in
test_pause_state_persist_layer.py. Extracting it to a shared test fixtures module would reduce duplication and ensure consistency.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
api/.importlinterapi/core/app/apps/advanced_chat/app_runner.pyapi/core/app/layers/conversation_variable_persist_layer.pyapi/core/workflow/nodes/node_factory.pyapi/core/workflow/nodes/variable_assigner/v1/node.pyapi/core/workflow/nodes/variable_assigner/v2/node.pyapi/core/workflow/runtime/graph_runtime_state_protocol.pyapi/core/workflow/runtime/read_only_wrappers.pyapi/services/conversation_service.pyapi/services/conversation_variable_updater.pyapi/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.pyapi/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.pyapi/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py
💤 Files with no reviewable changes (2)
- api/core/workflow/nodes/node_factory.py
- api/.importlinter
🧰 Additional context used
🧬 Code graph analysis (8)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (5)
api/core/app/layers/conversation_variable_persist_layer.py (2)
ConversationVariablePersistenceLayer(14-56)on_event(22-53)api/core/workflow/enums.py (1)
NodeType(36-85)api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/runtime/graph_runtime_state_protocol.py (6)
ReadOnlyGraphRuntimeState(25-83)system_variable(35-35)get(12-14)variable_pool(38-40)outputs(58-60)start_at(43-45)api/core/workflow/system_variable.py (3)
SystemVariable(11-118)conversation_id(159-160)as_view(117-118)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py (2)
api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/nodes/base/node.py (1)
run(267-349)
api/core/app/layers/conversation_variable_persist_layer.py (6)
api/core/workflow/enums.py (1)
NodeType(36-85)api/core/workflow/graph_engine/layers/base.py (1)
GraphEngineLayer(16-112)api/core/workflow/graph_events/node.py (1)
NodeRunSucceededEvent(38-39)api/core/workflow/runtime/graph_runtime_state_protocol.py (3)
system_variable(35-35)variable_pool(38-40)get(12-14)api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/services/conversation_variable_updater.py (1)
update(14-23)
api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (4)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/core/workflow/runtime/read_only_wrappers.py (1)
get(21-24)api/core/workflow/runtime/graph_runtime_state.py (1)
get(24-26)
api/core/workflow/runtime/read_only_wrappers.py (3)
api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (1)
get(22-25)api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
get(12-14)api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
get(70-78)
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (1)
api/core/workflow/nodes/variable_assigner/v2/node.py (1)
VariableAssignerNode(53-248)
api/core/app/apps/advanced_chat/app_runner.py (3)
api/core/app/layers/conversation_variable_persist_layer.py (1)
ConversationVariablePersistenceLayer(14-56)api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(29-30)api/core/workflow/graph_engine/graph_engine.py (1)
layer(215-218)
api/services/conversation_service.py (1)
api/services/conversation_variable_updater.py (1)
conversation_variable_updater_factory(29-30)
🔇 Additional comments (13)
api/services/conversation_variable_updater.py (1)
25-26: Explicit no-op flush is fine.
Clarifies the updater interface for callers that expect a flush hook.api/services/conversation_service.py (1)
20-20: Import update aligns with new service location.api/core/app/layers/conversation_variable_persist_layer.py (2)
22-36: Event filtering and early exits look solid.
Clear guards on event type, node type, runtime state, and conversation_id keep the layer focused and safe.
38-53: GraphEngine isolates layer exceptions—no risk of workflow abort.The
on_event()call in GraphEngine's event manager (event_management/event_manager.py) is wrapped in a try/except block that catches and logs all exceptions without re-raising them. IfConversationVariableUpdater.update()raisesConversationVariableNotFoundError, it will be caught, logged, and the workflow will continue normally.api/core/workflow/nodes/variable_assigner/v1/node.py (1)
1-2: Imports updated cleanly.api/core/workflow/nodes/variable_assigner/v2/node.py (1)
3-68: Constructor wiring is consistent and explicit.
The explicit__init__and TYPE_CHECKING imports keep initialization aligned with v1 and improve clarity.api/core/app/apps/advanced_chat/app_runner.py (1)
23-45: Conversation-variable persistence layer wiring looks good.
Layer insertion and factory usage are clear and consistent with the new service location.Also applies to: 204-205
api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py (1)
395-431: LGTM! Test validates the factory-to-node creation pathway.The test correctly verifies that
DifyNodeFactorycan instantiate aVariableAssignerNodefrom a graph node definition, which aligns with the updated v2 node constructor that acceptsgraph_init_paramsandgraph_runtime_state.api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py (1)
70-78: LGTM! Mock correctly implements the updated selector-based API.The
getmethod signature change from(node_id, variable_key)to(selector: Sequence[str])aligns with the updatedReadOnlyVariablePoolprotocol. The guard clause forlen(selector) < 2provides safe handling of edge cases.api/core/workflow/runtime/graph_runtime_state_protocol.py (1)
12-14: LGTM! Clean API change to selector-based variable retrieval.The signature change from
(node_id: str, variable_key: str)to(selector: Sequence[str], /)provides a more flexible and unified access pattern. The positional-only parameter (/) enforces a consistent calling convention.api/core/workflow/runtime/read_only_wrappers.py (1)
21-24: LGTM! Wrapper correctly implements the updated protocol.The implementation properly delegates to the underlying
VariablePool.get(selector)while maintaining the defensive copy behavior.api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py (2)
63-85: LGTM! Comprehensive test for conversation variable persistence.The test correctly validates that:
- The layer persists conversation variables when processing
NodeRunSucceededEventfromVARIABLE_ASSIGNERnodes- The updater receives the correct
conversation_idandvariableflush()is called after the update
88-144: Good coverage of edge cases.The tests appropriately verify that the layer skips persistence when:
- Outputs/process_data are missing
- The node type is not
VARIABLE_ASSIGNER- The variable selector doesn't target conversation variables
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| def update(self, conversation_id: str, variable: Variable) -> None: | ||
| stmt = select(ConversationVariable).where( | ||
| ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id | ||
| ) | ||
| with Session(db.engine) as session: | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise VariableOperatorNodeError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() | ||
|
|
||
| def flush(self): | ||
| session = Session(db.engine) | ||
| row = session.scalar(stmt) | ||
| if not row: | ||
| raise ConversationVariableNotFoundError("conversation variable not found in the database") | ||
| row.data = variable.model_dump_json() | ||
| session.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, check if the file exists and view the current code
fd conversation_variable_updater.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 179
🏁 Script executed:
# Search for other session usage patterns in the codebase to understand conventions
rg "Session\(db\.engine\)" --type py -A 5Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 50395
🏁 Script executed:
# Check if there's any session management pattern already established
rg "with Session" --type py -A 3 | head -50Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 3650
🌐 Web query:
SQLAlchemy 2.x session lifecycle management context manager best practices
💡 Result:
Concise best-practices for SQLAlchemy 2.x session lifecycle (context-manager focused)
- Create a single sessionmaker (bound to the Engine) at module/app scope and use it to produce Sessions; don’t instantiate Sessions inside low-level business logic. [1][2]
- Scope Sessions to a short, explicit block using context managers so close() is automatic:
- Use Session() as a context manager to ensure .close() runs. [1][3]
- Use session.begin() (or Session.begin() from a sessionmaker) as a nested context to get automatic begin→commit and rollback on exception. Example: with Session.begin() as session: ... . [1][2][3]
- Prefer explicit transaction boundaries over implicit usage:
- Rely on session.begin() (or the session.begin() context) rather than letting multiple functions implicitly trigger autobegin. Consider disabling autobegin (Session(..., autobegin=False)) if you must force explicit begin points. [1][2]
- Use begin_nested() for per-item savepoints (e.g., catch IntegrityError on individual rows without aborting the outer transaction). [3]
- Handle resources and errors:
- Context managers will commit on success and rollback on exceptions; they also close/reset the Session when leaving scope. Avoid manual try/except around commit/rollback when using the context manager. [1][2]
- Be mindful of object state after commit:
- Objects are expired by default on commit (expire_on_commit); set or override if you need detached objects to retain loaded state. [1][2]
- For frameworks/requests:
- Manage the Session lifecycle at the request/module level (open at request start, begin transaction as needed, commit/rollback and close at request end). Keep data-access functions accept a Session parameter instead of creating one. [2]
- Other notes:
- session.close() will rollback any open transaction and release connections; use it (or the context manager) to avoid leaked connections. [1][3]
Sources: SQLAlchemy 2.x docs — Session API & Session Basics; Transactions & session transaction patterns. [1][2][3]
References
[1] SQLAlchemy — Session API / sessionmaker.begin() / context manager usage.
[2] SQLAlchemy — Session Basics (scoping, autobegin, expire_on_commit; recommended pattern to pass session into functions).
[3] SQLAlchemy — Transactions & Session.begin_nested(), explicit begin, close/rollback behavior.
🏁 Script executed:
# View the specific file to confirm current code state
sed -n '14,23p' api/services/conversation_variable_updater.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 589
🏁 Script executed:
# Check if there are other instances of bare Session() without context manager in this file
rg "Session\(db\.engine\)" api/services/conversation_variable_updater.py -A 10Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 393
Wrap session creation in a context manager to ensure proper resource cleanup.
Session(db.engine) is created without any cleanup mechanism. If an exception occurs before session.commit(), the session remains open and the connection is leaked back to the pool. Use with Session(db.engine) as session: to automatically close the session and rollback any open transaction, matching the established pattern throughout the codebase.
🔧 Suggested fix
- session = Session(db.engine)
- row = session.scalar(stmt)
- if not row:
- raise ConversationVariableNotFoundError("conversation variable not found in the database")
- row.data = variable.model_dump_json()
- session.commit()
+ with Session(db.engine) as session:
+ row = session.scalar(stmt)
+ if not row:
+ raise ConversationVariableNotFoundError("conversation variable not found in the database")
+ row.data = variable.model_dump_json()
+ session.commit()🤖 Prompt for AI Agents
In `@api/services/conversation_variable_updater.py` around lines 14 - 23, The
update method creates a Session(db.engine) without cleanup; change it to use a
context manager so the session is always closed/rolled back on exceptions: wrap
the body of update in "with Session(db.engine) as session:" and perform the
select/row lookup, assignment to row.data (using ConversationVariable and
variable.model_dump_json()), and session.commit() inside that block so
ConversationVariableNotFoundError is still raised when no row is found and
resources are cleaned up automatically.
Benchmark PR from qodo-benchmark#429
Summary by CodeRabbit
Release Notes
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.