Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
335bf83
Added constructor-based DI for VariableAssigner v2 and wired the node…
laipz8200 Jan 4, 2026
0fd225a
Updated to avoid the dict path and let the type checker enforce const…
laipz8200 Jan 4, 2026
67007f6
A new command for updating variables. (vibe-kanban 00377ffe)
laipz8200 Jan 4, 2026
43fefe8
Add a new persistent storage for handling Conversation Variables. (v…
laipz8200 Jan 4, 2026
89e644f
- fix(lint): replace `outputs.keys()` iteration in `api/core/app/laye…
laipz8200 Jan 4, 2026
3edd525
- feat(graph-engine): enforce variable update value types with Segmen…
laipz8200 Jan 4, 2026
638b0ef
- refactor(variable-assigner-v1): inline updated variable payload and…
laipz8200 Jan 4, 2026
5ebc87a
- refactor(graph-engine): switch VariableUpdate.value to VariableUnio…
laipz8200 Jan 5, 2026
89d292e
- refactor(variable-assigner): restore common_helpers updated-variabl…
laipz8200 Jan 5, 2026
8505033
- refactor(variable-assigner): drop updated outputs now that persiste…
laipz8200 Jan 5, 2026
36b4a6e
- chore(lint): run `make lint` (fails: dotenv-linter module missing a…
laipz8200 Jan 5, 2026
99509eb
Revert "- refactor(graph-engine): switch VariableUpdate.value to Vari…
laipz8200 Jan 5, 2026
2b044dd
Revert "- feat(graph-engine): enforce variable update value types wit…
laipz8200 Jan 5, 2026
2831694
Revert "A new command for updating variables. (vibe-kanban 00377ffe)"
laipz8200 Jan 5, 2026
b083fa2
- refactor(services): move ConversationVariableUpdaterImpl and factor…
laipz8200 Jan 5, 2026
deeebb8
- chore(lint): run `make lint` (fails: import linter error and dotenv…
laipz8200 Jan 5, 2026
1192648
- chore(import-linter): remove obsolete ignore for variable_assigner …
laipz8200 Jan 5, 2026
7c30854
fix(core): guard conversation variable flush when no updates
laipz8200 Jan 5, 2026
09b7aed
fix(core): use system variable conversation_id and add custom updater…
laipz8200 Jan 6, 2026
7b19f20
fix(services): use Exception for conversation variable not found
laipz8200 Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion api/.importlinter
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ ignore_imports =
core.workflow.nodes.llm.llm_utils -> extensions.ext_database
core.workflow.nodes.llm.node -> extensions.ext_database
core.workflow.nodes.tool.tool_node -> extensions.ext_database
core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database
core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis
core.workflow.graph_engine.manager -> extensions.ext_redis
core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis
Expand Down
4 changes: 4 additions & 0 deletions api/core/app/apps/advanced_chat/app_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
QueueTextChunkEvent,
)
from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature
from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer
from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.variables.variables import VariableUnion
Expand All @@ -40,6 +41,7 @@
from models.enums import UserFrom
from models.model import App, Conversation, Message, MessageAnnotation
from models.workflow import ConversationVariable
from services.conversation_variable_updater import conversation_variable_updater_factory

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,6 +202,8 @@ def run(self):
)

workflow_entry.graph_engine.layer(persistence_layer)
conversation_variable_layer = ConversationVariablePersistenceLayer(conversation_variable_updater_factory())
workflow_entry.graph_engine.layer(conversation_variable_layer)
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

Expand Down
60 changes: 60 additions & 0 deletions api/core/app/layers/conversation_variable_persist_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import logging

from core.variables import Variable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.enums import NodeType
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events import GraphEngineEvent, NodeRunSucceededEvent
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers

logger = logging.getLogger(__name__)


class ConversationVariablePersistenceLayer(GraphEngineLayer):
def __init__(self, conversation_variable_updater: ConversationVariableUpdater) -> None:
super().__init__()
self._conversation_variable_updater = conversation_variable_updater

def on_graph_start(self) -> None:
pass

def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, NodeRunSucceededEvent):
return
if event.node_type != NodeType.VARIABLE_ASSIGNER:
return
if self.graph_runtime_state is None:
return

updated_variables = common_helpers.get_updated_variables(event.node_run_result.process_data) or []
if not updated_variables:
return

conversation_id = self.graph_runtime_state.system_variable.conversation_id
if conversation_id is None:
return

updated_any = False
for item in updated_variables:
selector = item.selector
if len(selector) < 2:
logger.warning("Conversation variable selector invalid. selector=%s", selector)
continue
if selector[0] != CONVERSATION_VARIABLE_NODE_ID:
continue
variable = self.graph_runtime_state.variable_pool.get(selector)
if not isinstance(variable, Variable):
logger.warning(
"Conversation variable not found in variable pool. selector=%s",
selector,
)
continue
self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable)
updated_any = True

if updated_any:
self._conversation_variable_updater.flush()

def on_graph_end(self, error: Exception | None) -> None:
pass
1 change: 0 additions & 1 deletion api/core/workflow/nodes/node_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ def create_node(self, node_config: dict[str, object]) -> Node:
code_providers=self._code_providers,
code_limits=self._code_limits,
)

if node_type == NodeType.TEMPLATE_TRANSFORM:
return TemplateTransformNode(
id=node_id,
Expand Down
21 changes: 2 additions & 19 deletions api/core/workflow/nodes/variable_assigner/v1/node.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
from collections.abc import Callable, Mapping, Sequence
from typing import TYPE_CHECKING, Any, TypeAlias
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any

from core.variables import SegmentType, Variable
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.entities import GraphInitParams
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers
from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError

from ..common.impl import conversation_variable_updater_factory
from .node_data import VariableAssignerData, WriteMode

if TYPE_CHECKING:
from core.workflow.runtime import GraphRuntimeState


_CONV_VAR_UPDATER_FACTORY: TypeAlias = Callable[[], ConversationVariableUpdater]


class VariableAssignerNode(Node[VariableAssignerData]):
node_type = NodeType.VARIABLE_ASSIGNER
_conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY

def __init__(
self,
id: str,
config: Mapping[str, Any],
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY = conversation_variable_updater_factory,
):
super().__init__(
id=id,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self._conv_var_updater_factory = conv_var_updater_factory

@classmethod
def version(cls) -> str:
Expand Down Expand Up @@ -96,16 +88,7 @@ def _run(self) -> NodeRunResult:
# Over write the variable.
self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable)

# TODO: Move database operation to the pipeline.
# Update conversation variable.
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"])
if not conversation_id:
raise VariableOperatorNodeError("conversation_id not found")
conv_var_updater = self._conv_var_updater_factory()
conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable)
conv_var_updater.flush()
updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)]

return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={
Expand Down
41 changes: 19 additions & 22 deletions api/core/workflow/nodes/variable_assigner/v2/node.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
import json
from collections.abc import Mapping, MutableMapping, Sequence
from typing import Any, cast
from typing import TYPE_CHECKING, Any

from core.app.entities.app_invoke_entities import InvokeFrom
from core.variables import SegmentType, Variable
from core.variables.consts import SELECTORS_LENGTH
from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID
from core.workflow.conversation_variable_updater import ConversationVariableUpdater
from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
from core.workflow.nodes.variable_assigner.common import helpers as common_helpers
from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory

from . import helpers
from .entities import VariableAssignerNodeData, VariableOperationItem
from .enums import InputType, Operation
from .exc import (
ConversationIDNotFoundError,
InputTypeNotSupportedError,
InvalidDataError,
InvalidInputValueError,
OperationNotSupportedError,
VariableNotFoundError,
)

if TYPE_CHECKING:
from core.workflow.entities import GraphInitParams
from core.workflow.runtime import GraphRuntimeState


def _target_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_id: str, item: VariableOperationItem):
selector_node_id = item.variable_selector[0]
Expand Down Expand Up @@ -53,6 +53,20 @@ def _source_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_
class VariableAssignerNode(Node[VariableAssignerNodeData]):
node_type = NodeType.VARIABLE_ASSIGNER

def __init__(
self,
id: str,
config: Mapping[str, Any],
graph_init_params: "GraphInitParams",
graph_runtime_state: "GraphRuntimeState",
):
super().__init__(
id=id,
config=config,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)

def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool:
"""
Check if this Variable Assigner node blocks the output of specific variables.
Expand All @@ -70,9 +84,6 @@ def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bo

return False

def _conv_var_updater_factory(self) -> ConversationVariableUpdater:
return conversation_variable_updater_factory()

@classmethod
def version(cls) -> str:
return "2"
Expand Down Expand Up @@ -179,26 +190,12 @@ def _run(self) -> NodeRunResult:
# remove the duplicated items first.
updated_variable_selectors = list(set(map(tuple, updated_variable_selectors)))

conv_var_updater = self._conv_var_updater_factory()
# Update variables
for selector in updated_variable_selectors:
variable = self.graph_runtime_state.variable_pool.get(selector)
if not isinstance(variable, Variable):
raise VariableNotFoundError(variable_selector=selector)
process_data[variable.name] = variable.value

if variable.selector[0] == CONVERSATION_VARIABLE_NODE_ID:
conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"])
if not conversation_id:
if self.invoke_from != InvokeFrom.DEBUGGER:
raise ConversationIDNotFoundError
else:
conversation_id = conversation_id.value
conv_var_updater.update(
conversation_id=cast(str, conversation_id),
variable=variable,
)
conv_var_updater.flush()
updated_variables = [
common_helpers.variable_to_processed_data(selector, seg)
for selector in updated_variable_selectors
Expand Down
4 changes: 2 additions & 2 deletions api/core/workflow/runtime/graph_runtime_state_protocol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from typing import Any, Protocol

from core.model_runtime.entities.llm_entities import LLMUsage
Expand All @@ -9,7 +9,7 @@
class ReadOnlyVariablePool(Protocol):
"""Read-only interface for VariablePool."""

def get(self, node_id: str, variable_key: str) -> Segment | None:
def get(self, selector: Sequence[str], /) -> Segment | None:
"""Get a variable value (read-only)."""
...

Expand Down
6 changes: 3 additions & 3 deletions api/core/workflow/runtime/read_only_wrappers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from collections.abc import Mapping
from collections.abc import Mapping, Sequence
from copy import deepcopy
from typing import Any

Expand All @@ -18,9 +18,9 @@ class ReadOnlyVariablePoolWrapper:
def __init__(self, variable_pool: VariablePool) -> None:
self._variable_pool = variable_pool

def get(self, node_id: str, variable_key: str) -> Segment | None:
def get(self, selector: Sequence[str], /) -> Segment | None:
"""Return a copy of a variable value if present."""
value = self._variable_pool.get([node_id, variable_key])
value = self._variable_pool.get(selector)
return deepcopy(value) if value is not None else None

def get_all_by_node(self, node_id: str) -> Mapping[str, object]:
Expand Down
2 changes: 1 addition & 1 deletion api/services/conversation_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from core.db.session_factory import session_factory
from core.llm_generator.llm_generator import LLMGenerator
from core.variables.types import SegmentType
from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory
from extensions.ext_database import db
from factories import variable_factory
from libs.datetime_utils import naive_utc_now
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models import Account, ConversationVariable
from models.model import App, Conversation, EndUser, Message
from services.conversation_variable_updater import conversation_variable_updater_factory
from services.errors.conversation import (
ConversationNotExistsError,
ConversationVariableNotExistsError,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
from extensions.ext_database import db
from models import ConversationVariable

from .exc import VariableOperatorNodeError

class ConversationVariableNotFoundError(Exception):
pass


class ConversationVariableUpdaterImpl:
def update(self, conversation_id: str, variable: Variable):
def update(self, conversation_id: str, variable: Variable) -> None:
stmt = select(ConversationVariable).where(
ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id
)
with Session(db.engine) as session:
row = session.scalar(stmt)
if not row:
raise VariableOperatorNodeError("conversation variable not found in the database")
raise ConversationVariableNotFoundError("conversation variable not found in the database")
row.data = variable.model_dump_json()
session.commit()

def flush(self):
def flush(self) -> None:
pass


Expand Down
Loading
Loading