fix(api): defer streaming response until referenced variables are updated#14
Conversation
WalkthroughThese changes enhance the VariableAssignerNode by introducing a new method to check if variable output is blocked and modifying result input computation based on write mode (CLEAR vs. overwrite). A comprehensive test fixture and unit test validate the overwrite mode behavior with streaming variable propagation. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py`:
- Line 2: The membership check fails because assigned_variable_selector is a
list while variable_selectors is typed/used as a set of tuples
(variable_selectors), so a list will never equal a tuple; fix by normalizing
assigned_variable_selector to a tuple before checking membership (e.g.,
tuple(assigned_variable_selector)) and update the type hints from
typing.Set/typing.Tuple to modern lowercase annotations (set[tuple[str, ...]])
to match the base class/v2 node; apply the same normalization and annotation
change for the other occurrence around the block referenced (lines ~36-43) where
the same comparison is performed.
In
`@api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py`:
- Around line 50-77: The v1 Variable Assigner's blocks_variable_output currently
compares assigned_variable_selector (a Sequence[str]) directly against a
Set[Tuple[str, ...]], which always fails; update the blocks_variable_output
method to convert self.node_data.assigned_variable_selector to a tuple before
membership testing (i.e., use tuple(self.node_data.assigned_variable_selector)
and test that against variable_selectors) so blocking works correctly; also
extend the test in
test_streaming_conversation_variables_v1_overwrite_waits_for_assignment to
assert blocking behavior (e.g., ensure streaming chunks are not emitted until
after the assignment completes) rather than only asserting eventual events and
values.
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
api/core/workflow/nodes/variable_assigner/v1/node.pyapi/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.ymlapi/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py
🧰 Additional context used
🧬 Code graph analysis (1)
api/core/workflow/nodes/variable_assigner/v1/node.py (2)
api/core/workflow/nodes/base/node.py (2)
blocks_variable_output(435-445)node_data(535-537)api/core/workflow/nodes/variable_assigner/v2/node.py (1)
blocks_variable_output(70-85)
🔇 Additional comments (2)
api/core/workflow/nodes/variable_assigner/v1/node.py (1)
101-111: LGTM!The logic correctly differentiates between write modes for reporting input values: CLEAR mode reports the resulting value since there's no meaningful external input, while OVER_WRITE/APPEND modes report the actual incoming value.
api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml (1)
1-158: LGTM!The fixture is well-structured for testing the v1 Variable Assigner overwrite mode with streaming conversation variables. The workflow correctly chains Start → Variable Assigner (over-write) → Answer, with the answer template referencing the updated conversation variable.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| @@ -1,5 +1,5 @@ | |||
| from collections.abc import Mapping, Sequence | |||
| from typing import TYPE_CHECKING, Any | |||
| from typing import TYPE_CHECKING, Any, Set, Tuple | |||
There was a problem hiding this comment.
Bug: List-to-tuple comparison will always return False.
The assigned_variable_selector is a list (as seen in the YAML fixture), but variable_selectors is a Set[Tuple[str, ...]]. In Python, a list can never equal a tuple, so this membership check will always return False.
Additionally, the type annotation uses uppercase Set[Tuple[...]] from typing, while the base class and v2 node use the modern lowercase set[tuple[...]] style (Python 3.9+).
🐛 Proposed fix
-from typing import TYPE_CHECKING, Any, Set, Tuple
+from typing import TYPE_CHECKING, Any- def blocks_variable_output(self, variable_selectors: Set[Tuple[str, ...]]) -> bool:
+ def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool:
"""
Check if this Variable Assigner node blocks the output of specific variables.
Returns True if this node updates any of the requested conversation variables.
"""
- assigned_selector = self.node_data.assigned_variable_selector
- return assigned_selector in variable_selectors
+ assigned_selector_tuple = tuple(self.node_data.assigned_variable_selector)
+ return assigned_selector_tuple in variable_selectorsAlso applies to: 36-43
🤖 Prompt for AI Agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py` at line 2, The
membership check fails because assigned_variable_selector is a list while
variable_selectors is typed/used as a set of tuples (variable_selectors), so a
list will never equal a tuple; fix by normalizing assigned_variable_selector to
a tuple before checking membership (e.g., tuple(assigned_variable_selector)) and
update the type hints from typing.Set/typing.Tuple to modern lowercase
annotations (set[tuple[str, ...]]) to match the base class/v2 node; apply the
same normalization and annotation change for the other occurrence around the
block referenced (lines ~36-43) where the same comparison is performed.
| def test_streaming_conversation_variables_v1_overwrite_waits_for_assignment(): | ||
| fixture_name = "test_streaming_conversation_variables_v1_overwrite" | ||
| input_query = "overwrite-value" | ||
|
|
||
| case = WorkflowTestCase( | ||
| fixture_path=fixture_name, | ||
| use_auto_mock=False, | ||
| mock_config=MockConfigBuilder().build(), | ||
| query=input_query, | ||
| inputs={}, | ||
| expected_outputs={"answer": f"Current Value Of `conv_var` is:{input_query}"}, | ||
| ) | ||
|
|
||
| runner = TableTestRunner() | ||
| result = runner.run_test_case(case) | ||
| assert result.success, f"Test failed: {result.error}" | ||
|
|
||
| events = result.events | ||
| conv_var_chunk_events = [ | ||
| event | ||
| for event in events | ||
| if isinstance(event, NodeRunStreamChunkEvent) and event.selector == ["conversation", "conv_var"] | ||
| ] | ||
|
|
||
| assert conv_var_chunk_events, "Expected conversation variable chunk events to be emitted" | ||
| assert all(event.chunk == input_query for event in conv_var_chunk_events), ( | ||
| "Expected streamed conversation variable value to match the input query" | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "*.py" | head -20Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 935
🏁 Script executed:
rg "blocks_variable_output" -t py -B 3 -A 10 | head -100Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 7135
🏁 Script executed:
rg "def blocks_variable_output" -t py -A 15Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 6462
🏁 Script executed:
fd "test_streaming_conversation_variables.py" -t fRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 171
🏁 Script executed:
rg "assigned_variable_selector" -t py -B 3 -A 3 | head -80Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 8739
🏁 Script executed:
cat -n api/core/workflow/nodes/variable_assigner/v1/node.py | head -150Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 6056
🏁 Script executed:
rg "class.*NodeData" api/core/workflow/nodes/variable_assigner/v1/ -t py -A 20Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 471
🏁 Script executed:
cat -n api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.pyRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 3336
🏁 Script executed:
rg "blocks_variable_output" -t py -B 5 -A 5Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 5652
🏁 Script executed:
cat -n api/core/workflow/graph_engine/response_coordinator/coordinator.py | head -200Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 9035
🏁 Script executed:
fd "test_streaming_conversation_variables_v1_overwrite" -t dRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 65
🏁 Script executed:
find . -type f -name "*test_streaming_conversation_variables_v1_overwrite*" 2>/dev/nullRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 167
🏁 Script executed:
rg "test_streaming_conversation_variables_v1_overwrite" -t py -B 2 -A 10Repository: qodo-benchmark/dify-combined-coderabbit
Length of output: 1798
🏁 Script executed:
cat -n api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.ymlRepository: qodo-benchmark/dify-combined-coderabbit
Length of output: 4674
Fix the type mismatch in blocks_variable_output and enhance test coverage for the blocking behavior.
The v1 Variable Assigner's blocks_variable_output has a critical bug: it compares assigned_variable_selector (a Sequence[str]) directly against a Set[Tuple[str, ...]]. In Python, a list will never equal a tuple, so this check always returns False, defeating the blocking mechanism entirely.
The test assertions verify output correctness and event emission but do not validate the actual blocking behavior (waiting for assignment before streaming). Apply the same fix from v2: convert assigned_variable_selector to a tuple before checking membership.
def blocks_variable_output(self, variable_selectors: Set[Tuple[str, ...]]) -> bool:
assigned_selector_tuple = tuple(self.node_data.assigned_variable_selector)
return assigned_selector_tuple in variable_selectorsConsider enhancing the test to verify that streaming is delayed until after the variable assignment completes, not just that events are eventually emitted.
🤖 Prompt for AI Agents
In
`@api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py`
around lines 50 - 77, The v1 Variable Assigner's blocks_variable_output
currently compares assigned_variable_selector (a Sequence[str]) directly against
a Set[Tuple[str, ...]], which always fails; update the blocks_variable_output
method to convert self.node_data.assigned_variable_selector to a tuple before
membership testing (i.e., use tuple(self.node_data.assigned_variable_selector)
and test that against variable_selectors) so blocking works correctly; also
extend the test in
test_streaming_conversation_variables_v1_overwrite_waits_for_assignment to
assert blocking behavior (e.g., ensure streaming chunks are not emitted until
after the assignment completes) rather than only asserting eventual events and
values.
Benchmark PR from qodo-benchmark#423
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.