Skip to content

Commit ef90ae0

Browse files
Merge pull request #1000 from MervinPraison/claude/fix-telemetry-termination-20250718
Fix: Comprehensive telemetry cleanup to prevent agent termination hang
2 parents 87a5b1c + d8b1a09 commit ef90ae0

File tree

3 files changed

+94
-6
lines changed

3 files changed

+94
-6
lines changed

src/praisonai-agents/praisonaiagents/agent/agent.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1958,12 +1958,10 @@ def _cleanup_telemetry(self):
19581958
"""Clean up telemetry system to ensure proper program termination."""
19591959
try:
19601960
# Import here to avoid circular imports
1961-
from ..telemetry import get_telemetry
1961+
from ..telemetry import force_shutdown_telemetry
19621962

1963-
# Get the global telemetry instance and shut it down
1964-
telemetry = get_telemetry()
1965-
if telemetry and hasattr(telemetry, 'shutdown'):
1966-
telemetry.shutdown()
1963+
# Force shutdown of telemetry system with comprehensive cleanup
1964+
force_shutdown_telemetry()
19671965
except Exception as e:
19681966
# Log error but don't fail the execution
19691967
logging.debug(f"Error cleaning up telemetry: {e}")

src/praisonai-agents/praisonaiagents/telemetry/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
'get_telemetry',
2525
'enable_telemetry',
2626
'disable_telemetry',
27+
'force_shutdown_telemetry',
2728
'MinimalTelemetry',
2829
'TelemetryCollector', # For backward compatibility
2930
]
@@ -47,6 +48,12 @@ def disable_telemetry():
4748
_disable_telemetry()
4849

4950

51+
def force_shutdown_telemetry():
52+
"""Force shutdown of telemetry system with comprehensive cleanup."""
53+
from .telemetry import force_shutdown_telemetry as _force_shutdown_telemetry
54+
_force_shutdown_telemetry()
55+
56+
5057
# Auto-instrumentation and cleanup setup
5158
_initialized = False
5259
_atexit_registered = False

src/praisonai-agents/praisonaiagents/telemetry/telemetry.py

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ def flush(self):
318318
def shutdown(self):
319319
"""
320320
Shutdown telemetry and ensure all events are sent.
321+
Forces proper cleanup of background threads to prevent hanging.
321322
"""
322323
if not self.enabled:
323324
return
@@ -330,8 +331,55 @@ def shutdown(self):
330331
try:
331332
# Force a synchronous flush before shutdown
332333
self._posthog.flush()
334+
335+
# Get the PostHog client's internal thread pool for cleanup
336+
if hasattr(self._posthog, '_thread_pool'):
337+
thread_pool = self._posthog._thread_pool
338+
if thread_pool:
339+
try:
340+
# Stop accepting new tasks
341+
thread_pool.shutdown(wait=False)
342+
# Wait for threads to finish with timeout
343+
thread_pool.shutdown(wait=True)
344+
except:
345+
pass
346+
347+
# Force shutdown of any remaining threads
348+
if hasattr(self._posthog, '_consumer'):
349+
try:
350+
self._posthog._consumer.flush()
351+
self._posthog._consumer.shutdown()
352+
except:
353+
pass
354+
355+
# Standard shutdown
333356
self._posthog.shutdown()
334-
except:
357+
358+
# Additional cleanup - force thread termination
359+
import threading
360+
import time
361+
362+
# Wait up to 2 seconds for threads to terminate
363+
max_wait = 2.0
364+
start_time = time.time()
365+
366+
while time.time() - start_time < max_wait:
367+
# Check for any PostHog related threads
368+
posthog_threads = [
369+
t for t in threading.enumerate()
370+
if t != threading.current_thread()
371+
and not t.daemon
372+
and ('posthog' in t.name.lower() or 'analytics' in t.name.lower())
373+
]
374+
375+
if not posthog_threads:
376+
break
377+
378+
time.sleep(0.1)
379+
380+
except Exception as e:
381+
# Log the error but don't fail shutdown
382+
self.logger.debug(f"Error during PostHog shutdown: {e}")
335383
pass
336384

337385

@@ -361,6 +409,41 @@ def disable_telemetry():
361409
_telemetry_instance = MinimalTelemetry(enabled=False)
362410

363411

412+
def force_shutdown_telemetry():
413+
"""
414+
Force shutdown of telemetry system with comprehensive cleanup.
415+
This function ensures proper termination of all background threads.
416+
"""
417+
global _telemetry_instance
418+
if _telemetry_instance:
419+
_telemetry_instance.shutdown()
420+
421+
# Additional cleanup - wait for all threads to finish
422+
import threading
423+
import time
424+
425+
# Wait up to 3 seconds for any remaining threads to finish
426+
max_wait = 3.0
427+
start_time = time.time()
428+
429+
while time.time() - start_time < max_wait:
430+
# Check for any analytics/telemetry related threads
431+
analytics_threads = [
432+
t for t in threading.enumerate()
433+
if t != threading.current_thread()
434+
and not t.daemon
435+
and any(keyword in t.name.lower() for keyword in ['posthog', 'analytics', 'telemetry', 'consumer'])
436+
]
437+
438+
if not analytics_threads:
439+
break
440+
441+
time.sleep(0.1)
442+
443+
# Reset the global instance
444+
_telemetry_instance = None
445+
446+
364447
def enable_telemetry():
365448
"""Programmatically enable telemetry (if not disabled by environment)."""
366449
global _telemetry_instance

0 commit comments

Comments
 (0)