Skip to content

fix(api): defer streaming response until referenced variables are updated#14

Open
tomerqodo wants to merge 3 commits intocoderabbit_combined_20260121_augment_sentry_coderabbit_1_base_fixapi_defer_streaming_response_until_referenced_variables_are_updated_pr423from
coderabbit_combined_20260121_augment_sentry_coderabbit_1_head_fixapi_defer_streaming_response_until_referenced_variables_are_updated_pr423
Open

fix(api): defer streaming response until referenced variables are updated#14
tomerqodo wants to merge 3 commits intocoderabbit_combined_20260121_augment_sentry_coderabbit_1_base_fixapi_defer_streaming_response_until_referenced_variables_are_updated_pr423from
coderabbit_combined_20260121_augment_sentry_coderabbit_1_head_fixapi_defer_streaming_response_until_referenced_variables_are_updated_pr423

Conversation

@tomerqodo
Copy link

@tomerqodo tomerqodo commented Jan 22, 2026

Benchmark PR from qodo-benchmark#423

Summary by CodeRabbit

  • New Features

    • Enhanced variable assignment in workflows with improved support for variable overwriting modes.
    • Improved handling of conversation variables in streaming workflows.
  • Tests

    • Added test coverage for conversation variable overwriting behavior in streaming workflows.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 22, 2026

Walkthrough

These changes enhance the VariableAssignerNode by introducing a new method to check if variable output is blocked and modifying result input computation based on write mode (CLEAR vs. overwrite). A comprehensive test fixture and unit test validate the overwrite mode behavior with streaming variable propagation.

Changes

Cohort / File(s) Summary
Core Logic Enhancement
api/core/workflow/nodes/variable_assigner/v1/node.py
Added blocks_variable_output() method to check if assigned variable selector matches provided selectors. Modified _run() to compute result_input_value conditionally: uses updated_variable.to_object() for CLEAR mode, else uses income_value.to_object(). Extended typing imports with Set and Tuple.
Test Fixture & Unit Test
api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml, api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py
Added YAML test fixture defining a streaming workflow with Start → VariableAssigner (overwrite mode) → Answer nodes. Created corresponding unit test test_streaming_conversation_variables_v1_overwrite_waits_for_assignment() validating output streaming and variable value propagation.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐰 A blocker is born to guard the stream,
Variables overwrite and gleam,
From Start to Answer, values flow,
Testing proves what devs should know! ✨

🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description check ⚠️ Warning The pull request description is minimal and only references a benchmark PR, lacking the required summary, context, screenshots, and checklist completion details specified in the template. Provide a detailed summary of the changes, explain the motivation and context, include any relevant screenshots (before/after), and complete the checklist items to align with the repository's contribution template.
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly describes the main change: deferring streaming response until referenced variables are updated, which directly relates to the VariableAssignerNode modifications and test coverage added.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py`:
- Line 2: The membership check fails because assigned_variable_selector is a
list while variable_selectors is typed/used as a set of tuples
(variable_selectors), so a list will never equal a tuple; fix by normalizing
assigned_variable_selector to a tuple before checking membership (e.g.,
tuple(assigned_variable_selector)) and update the type hints from
typing.Set/typing.Tuple to modern lowercase annotations (set[tuple[str, ...]])
to match the base class/v2 node; apply the same normalization and annotation
change for the other occurrence around the block referenced (lines ~36-43) where
the same comparison is performed.

In
`@api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py`:
- Around line 50-77: The v1 Variable Assigner's blocks_variable_output currently
compares assigned_variable_selector (a Sequence[str]) directly against a
Set[Tuple[str, ...]], which always fails; update the blocks_variable_output
method to convert self.node_data.assigned_variable_selector to a tuple before
membership testing (i.e., use tuple(self.node_data.assigned_variable_selector)
and test that against variable_selectors) so blocking works correctly; also
extend the test in
test_streaming_conversation_variables_v1_overwrite_waits_for_assignment to
assert blocking behavior (e.g., ensure streaming chunks are not emitted until
after the assignment completes) rather than only asserting eventual events and
values.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6e8ab5a and 504c8cf.

📒 Files selected for processing (3)
  • api/core/workflow/nodes/variable_assigner/v1/node.py
  • api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml
  • api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py
🧰 Additional context used
🧬 Code graph analysis (1)
api/core/workflow/nodes/variable_assigner/v1/node.py (2)
api/core/workflow/nodes/base/node.py (2)
  • blocks_variable_output (435-445)
  • node_data (535-537)
api/core/workflow/nodes/variable_assigner/v2/node.py (1)
  • blocks_variable_output (70-85)
🔇 Additional comments (2)
api/core/workflow/nodes/variable_assigner/v1/node.py (1)

101-111: LGTM!

The logic correctly differentiates between write modes for reporting input values: CLEAR mode reports the resulting value since there's no meaningful external input, while OVER_WRITE/APPEND modes report the actual incoming value.

api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml (1)

1-158: LGTM!

The fixture is well-structured for testing the v1 Variable Assigner overwrite mode with streaming conversation variables. The workflow correctly chains Start → Variable Assigner (over-write) → Answer, with the answer template referencing the updated conversation variable.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@@ -1,5 +1,5 @@
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Set, Tuple
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Bug: List-to-tuple comparison will always return False.

The assigned_variable_selector is a list (as seen in the YAML fixture), but variable_selectors is a Set[Tuple[str, ...]]. In Python, a list can never equal a tuple, so this membership check will always return False.

Additionally, the type annotation uses uppercase Set[Tuple[...]] from typing, while the base class and v2 node use the modern lowercase set[tuple[...]] style (Python 3.9+).

🐛 Proposed fix
-from typing import TYPE_CHECKING, Any, Set, Tuple
+from typing import TYPE_CHECKING, Any
-    def blocks_variable_output(self, variable_selectors: Set[Tuple[str, ...]]) -> bool:
+    def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool:
         """
         Check if this Variable Assigner node blocks the output of specific variables.
 
         Returns True if this node updates any of the requested conversation variables.
         """
-        assigned_selector = self.node_data.assigned_variable_selector
-        return assigned_selector in variable_selectors
+        assigned_selector_tuple = tuple(self.node_data.assigned_variable_selector)
+        return assigned_selector_tuple in variable_selectors

Also applies to: 36-43

🤖 Prompt for AI Agents
In `@api/core/workflow/nodes/variable_assigner/v1/node.py` at line 2, The
membership check fails because assigned_variable_selector is a list while
variable_selectors is typed/used as a set of tuples (variable_selectors), so a
list will never equal a tuple; fix by normalizing assigned_variable_selector to
a tuple before checking membership (e.g., tuple(assigned_variable_selector)) and
update the type hints from typing.Set/typing.Tuple to modern lowercase
annotations (set[tuple[str, ...]]) to match the base class/v2 node; apply the
same normalization and annotation change for the other occurrence around the
block referenced (lines ~36-43) where the same comparison is performed.

Comment on lines +50 to +77
def test_streaming_conversation_variables_v1_overwrite_waits_for_assignment():
fixture_name = "test_streaming_conversation_variables_v1_overwrite"
input_query = "overwrite-value"

case = WorkflowTestCase(
fixture_path=fixture_name,
use_auto_mock=False,
mock_config=MockConfigBuilder().build(),
query=input_query,
inputs={},
expected_outputs={"answer": f"Current Value Of `conv_var` is:{input_query}"},
)

runner = TableTestRunner()
result = runner.run_test_case(case)
assert result.success, f"Test failed: {result.error}"

events = result.events
conv_var_chunk_events = [
event
for event in events
if isinstance(event, NodeRunStreamChunkEvent) and event.selector == ["conversation", "conv_var"]
]

assert conv_var_chunk_events, "Expected conversation variable chunk events to be emitted"
assert all(event.chunk == input_query for event in conv_var_chunk_events), (
"Expected streamed conversation variable value to match the input query"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "*.py" | head -20

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 935


🏁 Script executed:

rg "blocks_variable_output" -t py -B 3 -A 10 | head -100

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 7135


🏁 Script executed:

rg "def blocks_variable_output" -t py -A 15

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 6462


🏁 Script executed:

fd "test_streaming_conversation_variables.py" -t f

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 171


🏁 Script executed:

rg "assigned_variable_selector" -t py -B 3 -A 3 | head -80

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 8739


🏁 Script executed:

cat -n api/core/workflow/nodes/variable_assigner/v1/node.py | head -150

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 6056


🏁 Script executed:

rg "class.*NodeData" api/core/workflow/nodes/variable_assigner/v1/ -t py -A 20

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 471


🏁 Script executed:

cat -n api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 3336


🏁 Script executed:

rg "blocks_variable_output" -t py -B 5 -A 5

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 5652


🏁 Script executed:

cat -n api/core/workflow/graph_engine/response_coordinator/coordinator.py | head -200

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 9035


🏁 Script executed:

fd "test_streaming_conversation_variables_v1_overwrite" -t d

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 65


🏁 Script executed:

find . -type f -name "*test_streaming_conversation_variables_v1_overwrite*" 2>/dev/null

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 167


🏁 Script executed:

rg "test_streaming_conversation_variables_v1_overwrite" -t py -B 2 -A 10

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 1798


🏁 Script executed:

cat -n api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml

Repository: qodo-benchmark/dify-combined-coderabbit

Length of output: 4674


Fix the type mismatch in blocks_variable_output and enhance test coverage for the blocking behavior.

The v1 Variable Assigner's blocks_variable_output has a critical bug: it compares assigned_variable_selector (a Sequence[str]) directly against a Set[Tuple[str, ...]]. In Python, a list will never equal a tuple, so this check always returns False, defeating the blocking mechanism entirely.

The test assertions verify output correctness and event emission but do not validate the actual blocking behavior (waiting for assignment before streaming). Apply the same fix from v2: convert assigned_variable_selector to a tuple before checking membership.

def blocks_variable_output(self, variable_selectors: Set[Tuple[str, ...]]) -> bool:
    assigned_selector_tuple = tuple(self.node_data.assigned_variable_selector)
    return assigned_selector_tuple in variable_selectors

Consider enhancing the test to verify that streaming is delayed until after the variable assignment completes, not just that events are eventually emitted.

🤖 Prompt for AI Agents
In
`@api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py`
around lines 50 - 77, The v1 Variable Assigner's blocks_variable_output
currently compares assigned_variable_selector (a Sequence[str]) directly against
a Set[Tuple[str, ...]], which always fails; update the blocks_variable_output
method to convert self.node_data.assigned_variable_selector to a tuple before
membership testing (i.e., use tuple(self.node_data.assigned_variable_selector)
and test that against variable_selectors) so blocking works correctly; also
extend the test in
test_streaming_conversation_variables_v1_overwrite_waits_for_assignment to
assert blocking behavior (e.g., ensure streaming chunks are not emitted until
after the assignment completes) rather than only asserting eventual events and
values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants