Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 91 additions & 8 deletions src/praisonai-agents/praisonaiagents/telemetry/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ class MinimalTelemetry:
- Can be disabled via environment variables
"""

# Common error phrases that indicate interpreter shutdown
_SHUTDOWN_ERROR_PHRASES = [
'cannot schedule new futures',
'interpreter shutdown',
'atexit after shutdown',
'event loop closed',
'runtime is shutting down'
]

def __init__(self, enabled: bool = None):
"""
Initialize the minimal telemetry collector.
Expand Down Expand Up @@ -347,6 +356,11 @@ def shutdown(self):
posthog_client = getattr(self, '_posthog', None)
if posthog_client:
try:
# Check if Python interpreter is shutting down
if self._is_interpreter_shutting_down():
self.logger.debug("Interpreter shutting down, skipping PostHog operations")
return

# Use a timeout-based flush to prevent hanging
import threading
import time
Expand All @@ -368,27 +382,87 @@ def shutdown(self):
# Cleanup PostHog threads safely
self._shutdown_posthog_threads(posthog_client)

# Standard shutdown
posthog_client.shutdown()
# Standard shutdown - with interpreter shutdown check
if not self._is_interpreter_shutting_down():
posthog_client.shutdown()
else:
self.logger.debug("Skipping PostHog shutdown call due to interpreter shutdown")

except Exception as e:
# Log the error but don't fail shutdown
self.logger.error(f"Error during PostHog shutdown: {e}")
# Handle specific shutdown-related errors gracefully
if self._is_shutdown_related_error(e):
self.logger.debug(f"PostHog shutdown prevented due to interpreter shutdown: {e}")
else:
self.logger.error(f"Error during PostHog shutdown: {e}")
finally:
self._posthog = None

def _is_shutdown_related_error(self, error: Exception) -> bool:
"""
Check if an error is related to interpreter shutdown.

Args:
error: The exception to check

Returns:
True if the error is shutdown-related, False otherwise
"""
error_msg = str(error).lower()
return any(phrase in error_msg for phrase in self._SHUTDOWN_ERROR_PHRASES)

def _is_interpreter_shutting_down(self) -> bool:
"""
Check if the Python interpreter is shutting down.

Returns:
True if interpreter is shutting down, False otherwise
"""
try:
import sys

# Check if the interpreter is in shutdown mode
if hasattr(sys, 'is_finalizing') and sys.is_finalizing():
return True

# Check if we can create new threads (fails during shutdown)
try:
test_thread = threading.Thread(target=lambda: None)
test_thread.daemon = True
test_thread.start()
test_thread.join(timeout=0.001)
return False
except (RuntimeError, threading.ThreadError):
return True

except Exception:
# If we can't determine state, assume we're shutting down to be safe
return True

def _safe_flush_posthog(self, posthog_client):
"""Safely flush PostHog data with error handling."""
try:
# Skip flush if interpreter is shutting down
if self._is_interpreter_shutting_down():
self.logger.debug("Skipping PostHog flush due to interpreter shutdown")
return False

posthog_client.flush()
return True
except Exception as e:
self.logger.debug(f"PostHog flush error: {e}")
if self._is_shutdown_related_error(e):
self.logger.debug(f"PostHog flush prevented due to interpreter shutdown: {e}")
else:
self.logger.debug(f"PostHog flush error: {e}")
return False

def _shutdown_posthog_threads(self, posthog_client):
"""Safely shutdown PostHog background threads."""
try:
# Skip thread cleanup if interpreter is shutting down
if self._is_interpreter_shutting_down():
self.logger.debug("Skipping PostHog thread cleanup due to interpreter shutdown")
return

# Access thread pool safely (fix double shutdown issue)
thread_pool = getattr(posthog_client, '_thread_pool', None)
if thread_pool:
Expand All @@ -400,7 +474,10 @@ def _shutdown_posthog_threads(self, posthog_client):
import time
time.sleep(0.5)
except Exception as e:
self.logger.debug(f"Thread pool shutdown error: {e}")
if self._is_shutdown_related_error(e):
self.logger.debug(f"Thread pool shutdown prevented due to interpreter shutdown: {e}")
else:
self.logger.debug(f"Thread pool shutdown error: {e}")

# Clean up consumer
consumer = getattr(posthog_client, '_consumer', None)
Expand All @@ -411,10 +488,16 @@ def _shutdown_posthog_threads(self, posthog_client):
if hasattr(consumer, 'shutdown'):
consumer.shutdown()
except Exception as e:
self.logger.debug(f"Consumer shutdown error: {e}")
if self._is_shutdown_related_error(e):
self.logger.debug(f"Consumer shutdown prevented due to interpreter shutdown: {e}")
else:
self.logger.debug(f"Consumer shutdown error: {e}")

except Exception as e:
self.logger.debug(f"Error during PostHog thread cleanup: {e}")
if self._is_shutdown_related_error(e):
self.logger.debug(f"PostHog thread cleanup prevented due to interpreter shutdown: {e}")
else:
self.logger.debug(f"Error during PostHog thread cleanup: {e}")


# Global telemetry instance
Expand Down