Skip to content

Commit f345477

Browse files
github-actions[bot]Mervin Praison
andcommitted
Fix: Enhance telemetry cleanup to prevent agent termination hang
- Added comprehensive telemetry cleanup calls in agent.py at all return points - Ensures proper cleanup after guardrail validation failures - Prevents hanging during agent termination by forcing telemetry shutdown - Added test files to validate telemetry cleanup functionality - Addresses hanging issues in async agent execution flows 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Mervin Praison <mervin@praison.ai>
1 parent 2c93361 commit f345477

File tree

4 files changed

+295
-0
lines changed

4 files changed

+295
-0
lines changed

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,11 +1491,15 @@ def __init__(self, data):
14911491
validated_response = self._apply_guardrail_with_retry(response_text, original_prompt, temperature, tools, task_name, task_description, task_id)
14921492
# Execute callback after validation
14931493
self._execute_callback_and_display(original_prompt, validated_response, time.time() - start_time, task_name, task_description, task_id)
1494+
# Ensure proper cleanup of telemetry system to prevent hanging
1495+
self._cleanup_telemetry()
14941496
return validated_response
14951497
except Exception as e:
14961498
logging.error(f"Agent {self.name}: Guardrail validation failed after reflection: {e}")
14971499
# Rollback chat history on guardrail failure
14981500
self.chat_history = self.chat_history[:chat_history_length]
1501+
# Ensure proper cleanup of telemetry system to prevent hanging
1502+
self._cleanup_telemetry()
14991503
return None
15001504

15011505
# Check if we've hit max reflections
@@ -1509,11 +1513,15 @@ def __init__(self, data):
15091513
validated_response = self._apply_guardrail_with_retry(response_text, original_prompt, temperature, tools, task_name, task_description, task_id)
15101514
# Execute callback after validation
15111515
self._execute_callback_and_display(original_prompt, validated_response, time.time() - start_time, task_name, task_description, task_id)
1516+
# Ensure proper cleanup of telemetry system to prevent hanging
1517+
self._cleanup_telemetry()
15121518
return validated_response
15131519
except Exception as e:
15141520
logging.error(f"Agent {self.name}: Guardrail validation failed after max reflections: {e}")
15151521
# Rollback chat history on guardrail failure
15161522
self.chat_history = self.chat_history[:chat_history_length]
1523+
# Ensure proper cleanup of telemetry system to prevent hanging
1524+
self._cleanup_telemetry()
15171525
return None
15181526

15191527
# If not satisfactory and not at max reflections, continue with regeneration
@@ -1646,11 +1654,15 @@ async def achat(self, prompt: str, temperature=0.2, tools=None, output_json=None
16461654
validated_response = self._apply_guardrail_with_retry(response_text, prompt, temperature, tools, task_name, task_description, task_id)
16471655
# Execute callback after validation
16481656
self._execute_callback_and_display(normalized_content, validated_response, time.time() - start_time, task_name, task_description, task_id)
1657+
# Ensure proper cleanup of telemetry system to prevent hanging
1658+
self._cleanup_telemetry()
16491659
return validated_response
16501660
except Exception as e:
16511661
logging.error(f"Agent {self.name}: Guardrail validation failed for custom LLM: {e}")
16521662
# Rollback chat history on guardrail failure
16531663
self.chat_history = self.chat_history[:chat_history_length]
1664+
# Ensure proper cleanup of telemetry system to prevent hanging
1665+
self._cleanup_telemetry()
16541666
return None
16551667
except Exception as e:
16561668
# Rollback chat history if LLM call fails
@@ -1726,6 +1738,8 @@ async def achat(self, prompt: str, temperature=0.2, tools=None, output_json=None
17261738
logging.debug(f"Agent.achat completed in {total_time:.2f} seconds")
17271739
# Execute callback after tool completion
17281740
self._execute_callback_and_display(original_prompt, result, time.time() - start_time, task_name, task_description, task_id)
1741+
# Ensure proper cleanup of telemetry system to prevent hanging
1742+
self._cleanup_telemetry()
17291743
return result
17301744
elif output_json or output_pydantic:
17311745
response = await self._openai_client.async_client.chat.completions.create(
@@ -1740,6 +1754,8 @@ async def achat(self, prompt: str, temperature=0.2, tools=None, output_json=None
17401754
logging.debug(f"Agent.achat completed in {total_time:.2f} seconds")
17411755
# Execute callback after JSON/Pydantic completion
17421756
self._execute_callback_and_display(original_prompt, response_text, time.time() - start_time, task_name, task_description, task_id)
1757+
# Ensure proper cleanup of telemetry system to prevent hanging
1758+
self._cleanup_telemetry()
17431759
return response_text
17441760
else:
17451761
response = await self._openai_client.async_client.chat.completions.create(

test_telemetry_cleanup_fix.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test to verify telemetry cleanup fixes work correctly.
4+
This test checks that agents terminate properly without hanging after our cleanup fixes.
5+
"""
6+
7+
import threading
8+
import time
9+
from praisonaiagents import Agent
10+
11+
def test_telemetry_cleanup():
12+
"""Test that telemetry cleanup works correctly and agents don't hang."""
13+
14+
# Record initial thread count
15+
initial_threads = threading.active_count()
16+
print(f"Initial thread count: {initial_threads}")
17+
18+
# Create agent
19+
agent = Agent(
20+
name="TestAgent",
21+
role="Test Agent",
22+
goal="Test telemetry cleanup",
23+
instructions="Return a simple response"
24+
)
25+
26+
# Test regular chat completion
27+
print("Testing regular chat completion...")
28+
response = agent.chat("Hello", stream=False)
29+
print(f"Response: {response}")
30+
31+
# Test with self-reflection (to test the paths we fixed)
32+
print("\nTesting self-reflection path...")
33+
agent.self_reflect = True
34+
agent.min_reflect = 1
35+
agent.max_reflect = 1
36+
response = agent.chat("What is 2+2?", stream=False)
37+
print(f"Reflection response: {response}")
38+
39+
# Wait a moment for cleanup
40+
time.sleep(2)
41+
42+
# Check final thread count
43+
final_threads = threading.active_count()
44+
print(f"Final thread count: {final_threads}")
45+
46+
# Check if thread count is reasonable (some background threads may remain)
47+
if final_threads <= initial_threads + 5: # Allow for some background threads
48+
print("✅ Telemetry cleanup appears to be working correctly")
49+
return True
50+
else:
51+
print(f"❌ Possible thread leak detected: {final_threads - initial_threads} extra threads")
52+
return False
53+
54+
if __name__ == "__main__":
55+
success = test_telemetry_cleanup()
56+
if success:
57+
print("\n✅ All tests passed - telemetry cleanup is working correctly!")
58+
else:
59+
print("\n❌ Tests failed - there may be remaining telemetry cleanup issues")

test_telemetry_cleanup_simple.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Simple test to verify telemetry cleanup fixes work correctly.
4+
This test focuses on the cleanup functionality without requiring OpenAI API calls.
5+
"""
6+
7+
import os
8+
import threading
9+
import time
10+
import sys
11+
import logging
12+
13+
# Set up logging to see debug info
14+
logging.basicConfig(level=logging.DEBUG)
15+
16+
# Set a fake API key to avoid errors
17+
os.environ['OPENAI_API_KEY'] = 'test-key-for-cleanup-test'
18+
os.environ['OPENAI_API_BASE'] = 'http://localhost:1234/v1'
19+
20+
def test_telemetry_cleanup_direct():
21+
"""Test that telemetry cleanup works correctly by directly testing the cleanup functions."""
22+
23+
# Record initial thread count
24+
initial_threads = threading.active_count()
25+
print(f"Initial thread count: {initial_threads}")
26+
27+
# Test the telemetry cleanup function directly
28+
try:
29+
from praisonaiagents.telemetry.telemetry import get_telemetry, force_shutdown_telemetry
30+
31+
# Get a telemetry instance
32+
telemetry = get_telemetry()
33+
print(f"Telemetry enabled: {telemetry.enabled}")
34+
35+
# Check if PostHog is initialized
36+
if hasattr(telemetry, '_posthog') and telemetry._posthog:
37+
print("PostHog client initialized")
38+
else:
39+
print("PostHog client not initialized")
40+
41+
# Test cleanup
42+
print("Testing force_shutdown_telemetry()...")
43+
force_shutdown_telemetry()
44+
45+
# Wait a moment for cleanup
46+
time.sleep(1)
47+
48+
# Check final thread count
49+
final_threads = threading.active_count()
50+
print(f"Final thread count: {final_threads}")
51+
52+
# List remaining threads
53+
remaining_threads = threading.enumerate()
54+
print(f"Remaining threads: {[t.name for t in remaining_threads]}")
55+
56+
# Check if cleanup was successful
57+
if final_threads <= initial_threads + 2: # Allow for some background threads
58+
print("✅ Telemetry cleanup appears to be working correctly")
59+
return True
60+
else:
61+
print(f"❌ Possible thread leak detected: {final_threads - initial_threads} extra threads")
62+
return False
63+
64+
except Exception as e:
65+
print(f"❌ Error during telemetry cleanup test: {e}")
66+
return False
67+
68+
def test_agent_cleanup_method():
69+
"""Test that the agent cleanup method works correctly."""
70+
71+
try:
72+
from praisonaiagents import Agent
73+
74+
# Create agent
75+
agent = Agent(
76+
name="TestAgent",
77+
role="Test Agent",
78+
goal="Test telemetry cleanup",
79+
instructions="Return a simple response"
80+
)
81+
82+
# Test the cleanup method directly
83+
print("Testing agent._cleanup_telemetry()...")
84+
agent._cleanup_telemetry()
85+
86+
print("✅ Agent cleanup method executed successfully")
87+
return True
88+
89+
except Exception as e:
90+
print(f"❌ Error during agent cleanup test: {e}")
91+
return False
92+
93+
if __name__ == "__main__":
94+
print("Testing telemetry cleanup fixes...")
95+
96+
# Test 1: Direct telemetry cleanup
97+
print("\n=== Test 1: Direct telemetry cleanup ===")
98+
test1_success = test_telemetry_cleanup_direct()
99+
100+
# Test 2: Agent cleanup method
101+
print("\n=== Test 2: Agent cleanup method ===")
102+
test2_success = test_agent_cleanup_method()
103+
104+
# Final result
105+
if test1_success and test2_success:
106+
print("\n✅ All tests passed - telemetry cleanup is working correctly!")
107+
sys.exit(0)
108+
else:
109+
print("\n❌ Some tests failed - there may be remaining telemetry cleanup issues")
110+
sys.exit(1)

test_thread_cleanup.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Test to specifically check for telemetry thread cleanup and prevent hanging.
4+
This test simulates the scenario where telemetry threads could cause hanging.
5+
"""
6+
7+
import threading
8+
import time
9+
import sys
10+
import os
11+
12+
# Set a fake API key to avoid errors
13+
os.environ['OPENAI_API_KEY'] = 'test-key-for-cleanup-test'
14+
os.environ['OPENAI_API_BASE'] = 'http://localhost:1234/v1'
15+
16+
def test_thread_cleanup():
17+
"""Test that no telemetry threads remain after cleanup."""
18+
19+
print(f"Initial thread count: {threading.active_count()}")
20+
initial_threads = set(threading.enumerate())
21+
22+
# Import and use telemetry
23+
from praisonaiagents.telemetry.telemetry import get_telemetry, force_shutdown_telemetry
24+
25+
# Get telemetry instance (this might start background threads)
26+
telemetry = get_telemetry()
27+
28+
# Track some events to potentially start background threads
29+
telemetry.track_agent_execution("test_agent", success=True)
30+
telemetry.track_tool_usage("test_tool", success=True)
31+
telemetry.flush()
32+
33+
# Wait a moment for threads to start
34+
time.sleep(0.5)
35+
36+
after_telemetry_threads = set(threading.enumerate())
37+
new_threads = after_telemetry_threads - initial_threads
38+
39+
print(f"After telemetry initialization: {threading.active_count()} threads")
40+
if new_threads:
41+
print(f"New threads created: {[t.name for t in new_threads]}")
42+
43+
# Now force cleanup
44+
print("Forcing telemetry cleanup...")
45+
force_shutdown_telemetry()
46+
47+
# Wait for cleanup to complete
48+
time.sleep(1)
49+
50+
final_threads = set(threading.enumerate())
51+
remaining_new_threads = final_threads - initial_threads
52+
53+
print(f"Final thread count: {threading.active_count()}")
54+
print(f"Final threads: {[t.name for t in final_threads]}")
55+
56+
if remaining_new_threads:
57+
print(f"❌ Threads still remaining after cleanup: {[t.name for t in remaining_new_threads]}")
58+
return False
59+
else:
60+
print("✅ All telemetry threads cleaned up successfully")
61+
return True
62+
63+
def test_agent_cleanup():
64+
"""Test that agent cleanup works properly."""
65+
66+
from praisonaiagents import Agent
67+
68+
initial_threads = threading.active_count()
69+
print(f"Initial thread count before agent: {initial_threads}")
70+
71+
# Create agent
72+
agent = Agent(
73+
name="TestAgent",
74+
role="Test Agent",
75+
goal="Test cleanup",
76+
instructions="Test"
77+
)
78+
79+
after_agent_threads = threading.active_count()
80+
print(f"Thread count after agent creation: {after_agent_threads}")
81+
82+
# Force cleanup
83+
agent._cleanup_telemetry()
84+
85+
# Wait for cleanup
86+
time.sleep(0.5)
87+
88+
final_threads = threading.active_count()
89+
print(f"Final thread count after agent cleanup: {final_threads}")
90+
91+
if final_threads <= initial_threads + 1: # Allow for some variance
92+
print("✅ Agent cleanup successful")
93+
return True
94+
else:
95+
print(f"❌ Agent cleanup may have left threads: {final_threads - initial_threads} extra")
96+
return False
97+
98+
if __name__ == "__main__":
99+
print("Testing thread cleanup to prevent hanging...")
100+
101+
test1 = test_thread_cleanup()
102+
print()
103+
test2 = test_agent_cleanup()
104+
105+
if test1 and test2:
106+
print("\n✅ All thread cleanup tests passed!")
107+
sys.exit(0)
108+
else:
109+
print("\n❌ Some thread cleanup tests failed!")
110+
sys.exit(1)

0 commit comments

Comments
 (0)