Skip to content

Conversation

@IlyaGusev
Copy link
Owner

No description provided.

@github-actions
Copy link

github-actions bot commented Oct 8, 2025

Title

(Describe updated until commit bb2b17c)

Add client tests and fix event stream session termination logic


PR Type

Tests, Bug fix, Enhancement


Description

  • Added comprehensive client URL composition and agent query tests

  • Fixed event stream session finishing logic with interruption handling

  • Added AGENT_INTERRUPTED event type for graceful session termination

  • Implemented timeout-based queue polling to prevent hanging sessions


Diagram Walkthrough

flowchart LR
  A["Client Tests"] --> B["URL Composition"]
  A --> C["Agent Query/Stop"]
  D["Event Bus"] --> E["AGENT_INTERRUPTED Event"]
  D --> F["Timeout Polling"]
  F --> G["Session Cleanup"]
Loading

File Walkthrough

Relevant files
Tests
test_client.py
Add comprehensive client functionality tests                         

tests/test_client.py

  • Added URL composition tests for base, agent, and cancel endpoints
  • Implemented agent query test with multiplication calculation
    verification
  • Added agent stop test to verify interruption functionality
+75/-0   
Bug fix
event_bus.py
Fix event streaming and add interruption handling               

codearkt/event_bus.py

  • Added AGENT_INTERRUPTED event type for session interruption
  • Implemented _get_event method with timeout-based queue polling
  • Fixed stream_events termination logic to handle both end and
    interruption
  • Changed loop condition to properly check root agent completion
+24/-7   
Configuration changes
settings.py
Add finish wait timeout configuration                                       

codearkt/settings.py

  • Added FINISH_WAIT_TIMEOUT setting with 10 second default value
+1/-0     
Miscellaneous
pyproject.toml
Update package version to 1.9.5                                                   

pyproject.toml

  • Bumped version from 1.9.4 to 1.9.5
+1/-1     

@github-actions
Copy link

github-actions bot commented Oct 8, 2025

PR Reviewer Guide 🔍

(Review updated until commit bb2b17c)

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Race Condition

The _get_event method checks if queue exists, then waits for an event, but the queue could be removed between checks. This could lead to accessing a None queue or missing the AGENT_INTERRUPTED event if the queue is removed during the timeout wait.

async def _get_event(self, session_id: str, root_agent_name: str) -> AgentEvent:
    queue = self.queues.get(session_id)
    while queue is not None:
        try:
            event = await asyncio.wait_for(queue.get(), timeout=settings.FINISH_WAIT_TIMEOUT)
            return event
        except asyncio.TimeoutError:
            pass
        queue = self.queues.get(session_id)
    return AgentEvent(
        session_id=session_id,
        agent_name=root_agent_name,
        event_type=EventType.AGENT_INTERRUPTED,
    )
Logic Error

The loop condition uses AND operator for is_agent_end and is_root_agent, but is_root_agent is initialized to False. This means the loop will continue even after receiving AGENT_END from root agent on first iteration, potentially causing unexpected behavior.

is_root_agent = False
while not (is_agent_end and is_root_agent):
    try:
        event = await asyncio.wait_for(
            self._get_event(session_id, root_agent_name),
            timeout=settings.EVENT_BUS_STREAM_TIMEOUT,
        )
        yield event
        is_agent_end = event.event_type in (
            EventType.AGENT_END,
            EventType.AGENT_INTERRUPTED,
        )
        is_root_agent = event.agent_name == root_agent_name
Test Logic Issue

The test calls stop_agent inside the event loop on every iteration, which will repeatedly attempt to stop the same session. This doesn't properly test the stop functionality and may cause unexpected behavior or race conditions.

async def test_client_stop(mcp_server_test: MCPServerTest) -> None:
    history = [ChatMessage(role="user", content="What is 432412421249 * 4332144219?")]
    output_text = ""
    session_id = get_unique_id()
    for event in query_agent(
        history,
        host=mcp_server_test.host,
        port=mcp_server_test.port,
        agent_name="nested_agent",
        session_id=session_id,
    ):
        if event.event_type == EventType.OUTPUT and event.content:
            output_text += event.content
        stop_agent(session_id, host=mcp_server_test.host, port=mcp_server_test.port)
    assert "1873272970937648109531" not in output_text

@github-actions
Copy link

github-actions bot commented Oct 8, 2025

PR Code Suggestions ✨

Latest suggestions up to bb2b17c
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix premature agent interruption

Calling stop_agent on every iteration will immediately interrupt the agent before it
can produce meaningful output. Move the stop call outside the loop or add a
condition to stop after processing some events.

tests/test_client.py [65-74]

+event_count = 0
 for event in query_agent(
     history,
     host=mcp_server_test.host,
     port=mcp_server_test.port,
     agent_name="nested_agent",
     session_id=session_id,
 ):
     if event.event_type == EventType.OUTPUT and event.content:
         output_text += event.content
-    stop_agent(session_id, host=mcp_server_test.host, port=mcp_server_test.port)
+    event_count += 1
+    if event_count >= 2:
+        stop_agent(session_id, host=mcp_server_test.host, port=mcp_server_test.port)
Suggestion importance[1-10]: 9

__

Why: The suggestion identifies a critical bug where stop_agent is called on every iteration, immediately interrupting the agent before it can produce output. This explains why the test expects the result to NOT be in output_text, but the current implementation is flawed for a proper test.

High
General
Prevent busy-waiting in timeout loop

The infinite loop can cause high CPU usage when the queue exists but is empty. Add a
small delay after timeout to prevent busy-waiting and reduce resource consumption.

codearkt/event_bus.py [66-79]

 async def _get_event(self, session_id: str, root_agent_name: str) -> AgentEvent:
     queue = self.queues.get(session_id)
     while queue is not None:
         try:
             event = await asyncio.wait_for(queue.get(), timeout=settings.FINISH_WAIT_TIMEOUT)
             return event
         except asyncio.TimeoutError:
-            pass
+            await asyncio.sleep(0.1)
         queue = self.queues.get(session_id)
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential busy-waiting issue in the _get_event method. Adding a small delay after timeout would reduce CPU usage when the queue exists but is empty, improving resource efficiency.

Medium

Previous suggestions

Suggestions up to commit cb26d38
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add retry limit to prevent infinite loop

The infinite retry loop on timeout could cause the method to hang indefinitely if
the queue never receives events. Consider adding a maximum retry count or total
timeout to prevent potential deadlocks and resource exhaustion.

codearkt/event_bus.py [65-74]

 async def _get_event(self, session_id: str) -> AgentEvent:
-    while True:
+    max_retries = 10
+    retry_count = 0
+    while retry_count < max_retries:
         queue = self.queues.get(session_id)
         if not queue:
             raise RuntimeError(f"{session_id} is not tied to an agent anymore")
         try:
             event = await asyncio.wait_for(queue.get(), timeout=settings.FINISH_WAIT_TIMEOUT)
             return event
         except asyncio.TimeoutError:
+            retry_count += 1
             continue
+    raise RuntimeError(f"Maximum retries exceeded for session {session_id}")
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential issue where the infinite loop could hang indefinitely. Adding a retry limit is a reasonable safeguard, though the severity depends on the application's expected behavior and whether timeouts are expected to be transient.

Medium

@codecov
Copy link

codecov bot commented Oct 8, 2025

Codecov Report

❌ Patch coverage is 86.66667% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 65.99%. Comparing base (e420349) to head (e904e61).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
codearkt/event_bus.py 85.71% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #11      +/-   ##
==========================================
+ Coverage   62.61%   65.99%   +3.37%     
==========================================
  Files          14       14              
  Lines        1482     1491       +9     
  Branches      178      178              
==========================================
+ Hits          928      984      +56     
+ Misses        518      471      -47     
  Partials       36       36              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@IlyaGusev IlyaGusev closed this Oct 8, 2025
@IlyaGusev IlyaGusev reopened this Oct 8, 2025
@github-actions
Copy link

github-actions bot commented Oct 8, 2025

Persistent review updated to latest commit bb2b17c

@IlyaGusev IlyaGusev merged commit f1073b9 into main Oct 8, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants