1+ """
2+ Advanced metrics tracking for PraisonAI Agents.
3+
4+ This module provides comprehensive token and performance tracking
5+ with session-level aggregation and export capabilities.
6+ """
7+
8+ import time
9+ import json
10+ from dataclasses import dataclass , asdict
11+ from typing import Dict , Any , Optional , List , Union
12+ from datetime import datetime
13+ from pathlib import Path
14+
15+ @dataclass
16+ class TokenMetrics :
17+ """Comprehensive token tracking for all token types."""
18+ input_tokens : int = 0
19+ output_tokens : int = 0
20+ total_tokens : int = 0
21+
22+ # Special tokens
23+ audio_tokens : int = 0
24+ input_audio_tokens : int = 0
25+ output_audio_tokens : int = 0
26+ cached_tokens : int = 0
27+ cache_write_tokens : int = 0
28+ reasoning_tokens : int = 0
29+
30+ def __add__ (self , other : 'TokenMetrics' ) -> 'TokenMetrics' :
31+ """Enable metric aggregation."""
32+ return TokenMetrics (
33+ input_tokens = self .input_tokens + other .input_tokens ,
34+ output_tokens = self .output_tokens + other .output_tokens ,
35+ total_tokens = self .total_tokens + other .total_tokens ,
36+ audio_tokens = self .audio_tokens + other .audio_tokens ,
37+ input_audio_tokens = self .input_audio_tokens + other .input_audio_tokens ,
38+ output_audio_tokens = self .output_audio_tokens + other .output_audio_tokens ,
39+ cached_tokens = self .cached_tokens + other .cached_tokens ,
40+ cache_write_tokens = self .cache_write_tokens + other .cache_write_tokens ,
41+ reasoning_tokens = self .reasoning_tokens + other .reasoning_tokens ,
42+ )
43+
44+ def update_totals (self ):
45+ """Update total_tokens based on input and output tokens."""
46+ self .total_tokens = self .input_tokens + self .output_tokens
47+
48+ @classmethod
49+ def from_completion_usage (cls , usage : Any ) -> 'TokenMetrics' :
50+ """Create TokenMetrics from OpenAI CompletionUsage object."""
51+ metrics = cls ()
52+
53+ if hasattr (usage , 'prompt_tokens' ):
54+ metrics .input_tokens = usage .prompt_tokens or 0
55+ if hasattr (usage , 'completion_tokens' ):
56+ metrics .output_tokens = usage .completion_tokens or 0
57+ if hasattr (usage , 'total_tokens' ):
58+ metrics .total_tokens = usage .total_tokens or 0
59+
60+ # Handle audio tokens if present
61+ if hasattr (usage , 'prompt_tokens_details' ):
62+ details = usage .prompt_tokens_details
63+ if hasattr (details , 'audio_tokens' ):
64+ metrics .input_audio_tokens = details .audio_tokens or 0
65+ metrics .audio_tokens += metrics .input_audio_tokens
66+ if hasattr (details , 'cached_tokens' ):
67+ metrics .cached_tokens = details .cached_tokens or 0
68+
69+ if hasattr (usage , 'completion_tokens_details' ):
70+ details = usage .completion_tokens_details
71+ if hasattr (details , 'audio_tokens' ):
72+ metrics .output_audio_tokens = details .audio_tokens or 0
73+ metrics .audio_tokens += metrics .output_audio_tokens
74+ if hasattr (details , 'reasoning_tokens' ):
75+ metrics .reasoning_tokens = details .reasoning_tokens or 0
76+
77+ # Update total if not provided
78+ if metrics .total_tokens == 0 :
79+ metrics .update_totals ()
80+
81+ return metrics
82+
83+ @dataclass
84+ class PerformanceMetrics :
85+ """Performance tracking including TTFT and response times."""
86+ time_to_first_token : float = 0.0 # Time to first token in seconds
87+ total_time : float = 0.0 # Total generation time in seconds
88+ tokens_per_second : float = 0.0 # Tokens generated per second
89+ start_time : Optional [float ] = None
90+ first_token_time : Optional [float ] = None
91+ end_time : Optional [float ] = None
92+
93+ def start_timing (self ):
94+ """Start timing for this request."""
95+ self .start_time = time .time ()
96+
97+ def mark_first_token (self ):
98+ """Mark when first token was received."""
99+ if self .start_time :
100+ self .first_token_time = time .time ()
101+ self .time_to_first_token = self .first_token_time - self .start_time
102+
103+ def end_timing (self , token_count : int = 0 ):
104+ """End timing and calculate final metrics."""
105+ if self .start_time :
106+ self .end_time = time .time ()
107+ self .total_time = self .end_time - self .start_time
108+
109+ # Calculate tokens per second if we have token count
110+ if token_count > 0 and self .total_time > 0 :
111+ self .tokens_per_second = token_count / self .total_time
112+
113+ class MetricsCollector :
114+ """Session-level metric aggregation and export."""
115+
116+ def __init__ (self ):
117+ self .session_id = f"session_{ int (time .time ())} _{ id (self )} "
118+ self .start_time = datetime .now ()
119+ self .agent_metrics : Dict [str , TokenMetrics ] = {}
120+ self .agent_performance : Dict [str , List [PerformanceMetrics ]] = {}
121+ self .model_metrics : Dict [str , TokenMetrics ] = {}
122+ self .total_metrics = TokenMetrics ()
123+
124+ def add_agent_metrics (self , agent_name : str , token_metrics : TokenMetrics ,
125+ performance_metrics : Optional [PerformanceMetrics ] = None ,
126+ model_name : Optional [str ] = None ):
127+ """Add metrics for a specific agent."""
128+ # Aggregate by agent
129+ if agent_name not in self .agent_metrics :
130+ self .agent_metrics [agent_name ] = TokenMetrics ()
131+ self .agent_metrics [agent_name ] += token_metrics
132+
133+ # Track performance metrics
134+ if performance_metrics :
135+ if agent_name not in self .agent_performance :
136+ self .agent_performance [agent_name ] = []
137+ self .agent_performance [agent_name ].append (performance_metrics )
138+
139+ # Aggregate by model
140+ if model_name :
141+ if model_name not in self .model_metrics :
142+ self .model_metrics [model_name ] = TokenMetrics ()
143+ self .model_metrics [model_name ] += token_metrics
144+
145+ # Update total
146+ self .total_metrics += token_metrics
147+
148+ def get_session_metrics (self ) -> Dict [str , Any ]:
149+ """Get aggregated session metrics."""
150+ # Calculate average performance metrics
151+ avg_performance = {}
152+ for agent_name , perf_list in self .agent_performance .items ():
153+ if perf_list :
154+ avg_ttft = sum (p .time_to_first_token for p in perf_list ) / len (perf_list )
155+ avg_total_time = sum (p .total_time for p in perf_list ) / len (perf_list )
156+ avg_tps = sum (p .tokens_per_second for p in perf_list if p .tokens_per_second > 0 )
157+ if avg_tps > 0 :
158+ avg_tps = avg_tps / len ([p for p in perf_list if p .tokens_per_second > 0 ])
159+
160+ avg_performance [agent_name ] = {
161+ "average_ttft" : avg_ttft ,
162+ "average_total_time" : avg_total_time ,
163+ "average_tokens_per_second" : avg_tps ,
164+ "request_count" : len (perf_list )
165+ }
166+
167+ return {
168+ "session_id" : self .session_id ,
169+ "start_time" : self .start_time .isoformat (),
170+ "duration_seconds" : (datetime .now () - self .start_time ).total_seconds (),
171+ "total_tokens" : asdict (self .total_metrics ),
172+ "by_agent" : {name : asdict (metrics ) for name , metrics in self .agent_metrics .items ()},
173+ "by_model" : {name : asdict (metrics ) for name , metrics in self .model_metrics .items ()},
174+ "performance" : avg_performance
175+ }
176+
177+ def export_metrics (self , file_path : Union [str , Path ], format : str = "json" ):
178+ """Export metrics to file."""
179+ metrics = self .get_session_metrics ()
180+
181+ file_path = Path (file_path )
182+
183+ if format .lower () == "json" :
184+ with open (file_path , 'w' ) as f :
185+ json .dump (metrics , f , indent = 2 , default = str )
186+ else :
187+ raise ValueError (f"Unsupported export format: { format } " )
188+
189+ def reset (self ):
190+ """Reset all metrics for a new session."""
191+ self .session_id = f"session_{ int (time .time ())} _{ id (self )} "
192+ self .start_time = datetime .now ()
193+ self .agent_metrics .clear ()
194+ self .agent_performance .clear ()
195+ self .model_metrics .clear ()
196+ self .total_metrics = TokenMetrics ()
0 commit comments