Skip to content

Commit d8d9f46

Browse files
Merge pull request #1010 from MervinPraison/claude/issue-1009-20250718-1740
fix: prevent telemetry shutdown from hanging indefinitely
2 parents e350f7d + be68f3e commit d8d9f46

File tree

1 file changed

+83
-49
lines changed

1 file changed

+83
-49
lines changed

src/praisonai-agents/praisonaiagents/telemetry/telemetry.py

Lines changed: 83 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def __init__(self, enabled: bool = None):
5656

5757
self.logger = logging.getLogger(__name__)
5858

59+
# Add shutdown tracking to prevent double shutdown
60+
self._shutdown_complete = False
61+
self._shutdown_lock = threading.Lock()
62+
5963
if not self.enabled:
6064
self.logger.debug("Telemetry is disabled")
6165
return
@@ -72,6 +76,7 @@ def __init__(self, enabled: bool = None):
7276
"errors": 0,
7377
}
7478
self._metrics_lock = threading.Lock()
79+
self._max_timing_entries = 1000 # Limit to prevent memory leaks
7580

7681
# Collect basic environment info (anonymous)
7782
self._environment = {
@@ -102,7 +107,7 @@ def _get_framework_version(self) -> str:
102107
try:
103108
from .. import __version__
104109
return __version__
105-
except ImportError:
110+
except (ImportError, KeyError, AttributeError):
106111
return "unknown"
107112

108113
def track_agent_execution(self, agent_name: str = None, success: bool = True):
@@ -174,15 +179,21 @@ def track_tool_usage(self, tool_name: str, success: bool = True, execution_time:
174179
with self._metrics_lock:
175180
self._metrics["tool_calls"] += 1
176181

177-
# Add timing metrics if provided
182+
# Add timing metrics if provided (with memory management)
178183
if execution_time is not None:
179184
if "tool_execution_times" not in self._metrics:
180185
self._metrics["tool_execution_times"] = []
181-
self._metrics["tool_execution_times"].append({
186+
187+
timing_list = self._metrics["tool_execution_times"]
188+
timing_list.append({
182189
"tool_name": tool_name,
183190
"execution_time": execution_time,
184191
"success": success
185192
})
193+
194+
# Prevent memory leaks by limiting stored entries
195+
if len(timing_list) > self._max_timing_entries:
196+
timing_list[:] = timing_list[-self._max_timing_entries:]
186197

187198
# Send event to PostHog
188199
if self._posthog:
@@ -322,65 +333,88 @@ def shutdown(self):
322333
"""
323334
if not self.enabled:
324335
return
336+
337+
# Use lock to prevent concurrent shutdown calls
338+
with self._shutdown_lock:
339+
if self._shutdown_complete:
340+
return
341+
self._shutdown_complete = True
325342

326343
# Final flush
327344
self.flush()
328345

329346
# Shutdown PostHog if available
330-
if hasattr(self, '_posthog') and self._posthog:
347+
posthog_client = getattr(self, '_posthog', None)
348+
if posthog_client:
331349
try:
332-
# Force a synchronous flush before shutdown
333-
self._posthog.flush()
334-
335-
# Get the PostHog client's internal thread pool for cleanup
336-
if hasattr(self._posthog, '_thread_pool'):
337-
thread_pool = self._posthog._thread_pool
338-
if thread_pool:
339-
try:
340-
# Stop accepting new tasks
341-
thread_pool.shutdown(wait=False)
342-
# Wait for threads to finish with timeout
343-
thread_pool.shutdown(wait=True)
344-
except:
345-
pass
346-
347-
# Force shutdown of any remaining threads
348-
if hasattr(self._posthog, '_consumer'):
349-
try:
350-
self._posthog._consumer.flush()
351-
self._posthog._consumer.shutdown()
352-
except:
353-
pass
354-
355-
# Standard shutdown
356-
self._posthog.shutdown()
357-
358-
# Additional cleanup - force thread termination
350+
# Use a timeout-based flush to prevent hanging
359351
import threading
360352
import time
353+
import concurrent.futures
361354

362-
# Wait up to 2 seconds for threads to terminate
363-
max_wait = 2.0
364-
start_time = time.time()
365-
366-
while time.time() - start_time < max_wait:
367-
# Check for any PostHog related threads
368-
posthog_threads = [
369-
t for t in threading.enumerate()
370-
if t != threading.current_thread()
371-
and not t.daemon
372-
and ('posthog' in t.name.lower() or 'analytics' in t.name.lower())
373-
]
355+
# Use ThreadPoolExecutor for better control
356+
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
357+
flush_future = executor.submit(self._safe_flush_posthog, posthog_client)
374358

375-
if not posthog_threads:
376-
break
377-
378-
time.sleep(0.1)
359+
try:
360+
flush_future.result(timeout=5.0) # 5 second timeout
361+
self.logger.debug("PostHog flush completed successfully")
362+
except concurrent.futures.TimeoutError:
363+
self.logger.warning("PostHog flush timed out")
364+
flush_future.cancel()
365+
except Exception as e:
366+
self.logger.error(f"PostHog flush failed: {e}")
367+
368+
# Cleanup PostHog threads safely
369+
self._shutdown_posthog_threads(posthog_client)
370+
371+
# Standard shutdown
372+
posthog_client.shutdown()
379373

380374
except Exception as e:
381375
# Log the error but don't fail shutdown
382-
self.logger.debug(f"Error during PostHog shutdown: {e}")
383-
pass
376+
self.logger.error(f"Error during PostHog shutdown: {e}")
377+
finally:
378+
self._posthog = None
379+
380+
def _safe_flush_posthog(self, posthog_client):
381+
"""Safely flush PostHog data with error handling."""
382+
try:
383+
posthog_client.flush()
384+
return True
385+
except Exception as e:
386+
self.logger.debug(f"PostHog flush error: {e}")
387+
return False
388+
389+
def _shutdown_posthog_threads(self, posthog_client):
390+
"""Safely shutdown PostHog background threads."""
391+
try:
392+
# Access thread pool safely (fix double shutdown issue)
393+
thread_pool = getattr(posthog_client, '_thread_pool', None)
394+
if thread_pool:
395+
try:
396+
# Single shutdown call with timeout
397+
if hasattr(thread_pool, 'shutdown'):
398+
thread_pool.shutdown(wait=False)
399+
# Wait briefly for graceful shutdown
400+
import time
401+
time.sleep(0.5)
402+
except Exception as e:
403+
self.logger.debug(f"Thread pool shutdown error: {e}")
404+
405+
# Clean up consumer
406+
consumer = getattr(posthog_client, '_consumer', None)
407+
if consumer:
408+
try:
409+
if hasattr(consumer, 'flush'):
410+
consumer.flush()
411+
if hasattr(consumer, 'shutdown'):
412+
consumer.shutdown()
413+
except Exception as e:
414+
self.logger.debug(f"Consumer shutdown error: {e}")
415+
416+
except Exception as e:
417+
self.logger.debug(f"Error during PostHog thread cleanup: {e}")
384418

385419

386420
# Global telemetry instance

0 commit comments

Comments
 (0)