Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 59 additions & 90 deletions litellm/integrations/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -1097,9 +1097,11 @@ async def async_log_success_event(self, kwargs, response_obj, start_time, end_ti
),
client_ip=standard_logging_payload["metadata"].get("requester_ip_address"),
user_agent=standard_logging_payload["metadata"].get("user_agent"),
stream=str(standard_logging_payload.get("stream"))
if litellm.prometheus_emit_stream_label
else None,
stream=(
str(standard_logging_payload.get("stream"))
if litellm.prometheus_emit_stream_label
else None
),
)

if (
Expand Down Expand Up @@ -1317,26 +1319,39 @@ async def _increment_remaining_budget_metrics(
_user_spend = _metadata.get("user_api_key_user_spend", None)
_user_max_budget = _metadata.get("user_api_key_user_max_budget", None)

_key_budget_reset_at = self._parse_budget_reset_at(
_metadata.get("user_api_key_budget_reset_at")
)
_team_budget_reset_at = self._parse_budget_reset_at(
_metadata.get("user_api_key_team_budget_reset_at")
)
_user_budget_reset_at = self._parse_budget_reset_at(
_metadata.get("user_api_key_user_budget_reset_at")
)

results = await asyncio.gather(
self._set_api_key_budget_metrics_after_api_request(
user_api_key=user_api_key,
user_api_key_alias=user_api_key_alias,
response_cost=response_cost,
key_max_budget=_api_key_max_budget,
key_spend=_api_key_spend,
budget_reset_at=_key_budget_reset_at,
),
self._set_team_budget_metrics_after_api_request(
user_api_team=user_api_team,
user_api_team_alias=user_api_team_alias,
team_spend=_team_spend,
team_max_budget=_team_max_budget,
response_cost=response_cost,
budget_reset_at=_team_budget_reset_at,
),
self._set_user_budget_metrics_after_api_request(
user_id=user_id,
user_spend=_user_spend,
user_max_budget=_user_max_budget,
response_cost=response_cost,
budget_reset_at=_user_budget_reset_at,
),
self._set_org_budget_metrics_after_api_request(
org_id=user_api_key_org_id,
Expand Down Expand Up @@ -1767,9 +1782,11 @@ async def async_post_call_failure_hook(
client_ip=_metadata.get("requester_ip_address"),
user_agent=_metadata.get("user_agent"),
model_id=model_id,
stream=str(request_data.get("stream"))
if litellm.prometheus_emit_stream_label
else None,
stream=(
str(request_data.get("stream"))
if litellm.prometheus_emit_stream_label
else None
),
)
_labels = prometheus_label_factory(
supported_enum_labels=self.get_labels_for_metric(
Expand Down Expand Up @@ -2093,9 +2110,9 @@ def set_llm_deployment_success_metrics(
):
try:
verbose_logger.debug("setting remaining tokens requests metric")
standard_logging_payload: Optional[
StandardLoggingPayload
] = request_kwargs.get("standard_logging_object")
standard_logging_payload: Optional[StandardLoggingPayload] = (
request_kwargs.get("standard_logging_object")
)

if standard_logging_payload is None:
return
Expand Down Expand Up @@ -2638,7 +2655,7 @@ async def _initialize_budget_metrics(
self,
data_fetch_function: Callable[..., Awaitable[Tuple[List[Any], Optional[int]]]],
set_metrics_function: Callable[[List[Any]], Awaitable[None]],
data_type: Literal["teams", "keys", "users"],
data_type: Literal["teams", "keys", "users", "orgs"],
):
"""
Generic method to initialize budget metrics for teams or API keys.
Expand Down Expand Up @@ -2728,9 +2745,7 @@ async def _initialize_api_key_budget_metrics(self):
)
return

async def fetch_keys(
page_size: int, page: int
) -> Tuple[
async def fetch_keys(page_size: int, page: int) -> Tuple[
List[Union[str, UserAPIKeyAuth, LiteLLM_DeletedVerificationToken]],
Optional[int],
]:
Expand Down Expand Up @@ -2921,9 +2936,11 @@ async def _set_org_list_budget_metrics(self, orgs: list):
org_alias=org.organization_alias or "",
spend=org.spend or 0.0,
max_budget=budget_table.max_budget if budget_table else None,
budget_reset_at=getattr(budget_table, "budget_reset_at", None)
if budget_table
else None,
budget_reset_at=(
getattr(budget_table, "budget_reset_at", None)
if budget_table
else None
),
)

async def _set_team_budget_metrics_after_api_request(
Expand All @@ -2933,6 +2950,7 @@ async def _set_team_budget_metrics_after_api_request(
team_spend: Optional[float],
team_max_budget: Optional[float],
response_cost: float,
budget_reset_at: Optional[datetime] = None,
):
"""
Set team budget metrics after an LLM API request
Expand All @@ -2948,6 +2966,7 @@ async def _set_team_budget_metrics_after_api_request(
spend=team_spend,
max_budget=team_max_budget,
response_cost=response_cost,
budget_reset_at=budget_reset_at,
)

self._set_team_budget_metrics(team_object)
Expand All @@ -2959,41 +2978,19 @@ async def _assemble_team_object(
spend: Optional[float],
max_budget: Optional[float],
response_cost: float,
budget_reset_at: Optional[datetime] = None,
) -> LiteLLM_TeamTable:
"""
Assemble a LiteLLM_TeamTable object

for fields not available in metadata, we fetch from db
Fields not available in metadata:
- `budget_reset_at`
"""
from litellm.proxy.auth.auth_checks import get_team_object
from litellm.proxy.proxy_server import prisma_client, user_api_key_cache

_total_team_spend = (spend or 0) + response_cost
team_object = LiteLLM_TeamTable(
team_id=team_id,
team_alias=team_alias,
spend=_total_team_spend,
max_budget=max_budget,
)
try:
team_info = await get_team_object(
team_id=team_id,
prisma_client=prisma_client,
user_api_key_cache=user_api_key_cache,
)
except Exception as e:
verbose_logger.debug(
f"[Non-Blocking] Prometheus: Error getting team info: {str(e)}"
)
return team_object

if team_info:
team_object.budget_reset_at = team_info.budget_reset_at
if team_object.max_budget is None and team_info.max_budget is not None:
team_object.max_budget = team_info.max_budget

team_object.budget_reset_at = budget_reset_at
return team_object

def _set_team_budget_metrics(
Expand Down Expand Up @@ -3204,6 +3201,7 @@ async def _set_api_key_budget_metrics_after_api_request(
response_cost: float,
key_max_budget: Optional[float],
key_spend: Optional[float],
budget_reset_at: Optional[datetime] = None,
):
if user_api_key:
user_api_key_dict = await self._assemble_key_object(
Expand All @@ -3212,6 +3210,7 @@ async def _set_api_key_budget_metrics_after_api_request(
key_max_budget=key_max_budget,
key_spend=key_spend,
response_cost=response_cost,
budget_reset_at=budget_reset_at,
)
self._set_key_budget_metrics(user_api_key_dict)

Expand All @@ -3222,34 +3221,19 @@ async def _assemble_key_object(
key_max_budget: Optional[float],
key_spend: Optional[float],
response_cost: float,
budget_reset_at: Optional[datetime] = None,
) -> UserAPIKeyAuth:
"""
Assemble a UserAPIKeyAuth object
"""
from litellm.proxy.auth.auth_checks import get_key_object
from litellm.proxy.proxy_server import prisma_client, user_api_key_cache

_total_key_spend = (key_spend or 0) + response_cost
user_api_key_dict = UserAPIKeyAuth(
token=user_api_key,
key_alias=user_api_key_alias,
max_budget=key_max_budget,
spend=_total_key_spend,
)
try:
if user_api_key_dict.token:
key_object = await get_key_object(
hashed_token=user_api_key_dict.token,
prisma_client=prisma_client,
user_api_key_cache=user_api_key_cache,
)
if key_object:
user_api_key_dict.budget_reset_at = key_object.budget_reset_at
except Exception as e:
verbose_logger.debug(
f"[Non-Blocking] Prometheus: Error getting key info: {str(e)}"
)

user_api_key_dict.budget_reset_at = budget_reset_at
return user_api_key_dict

async def _set_user_budget_metrics_after_api_request(
Expand All @@ -3258,12 +3242,12 @@ async def _set_user_budget_metrics_after_api_request(
user_spend: Optional[float],
user_max_budget: Optional[float],
response_cost: float,
budget_reset_at: Optional[datetime] = None,
):
"""
Set user budget metrics after an LLM API request

- Assemble a LiteLLM_UserTable object
- looks up user info from db if not available in metadata
- Set user budget metrics
"""
if user_id:
Expand All @@ -3272,6 +3256,7 @@ async def _set_user_budget_metrics_after_api_request(
spend=user_spend,
max_budget=user_max_budget,
response_cost=response_cost,
budget_reset_at=budget_reset_at,
)

self._set_user_budget_metrics(user_object)
Expand All @@ -3282,44 +3267,18 @@ async def _assemble_user_object(
spend: Optional[float],
max_budget: Optional[float],
response_cost: float,
budget_reset_at: Optional[datetime] = None,
) -> LiteLLM_UserTable:
"""
Assemble a LiteLLM_UserTable object

for fields not available in metadata, we fetch from db
Fields not available in metadata:
- `budget_reset_at`
"""
from litellm.proxy.auth.auth_checks import get_user_object
from litellm.proxy.proxy_server import prisma_client, user_api_key_cache

_total_user_spend = (spend or 0) + response_cost
user_object = LiteLLM_UserTable(
user_id=user_id,
spend=_total_user_spend,
max_budget=max_budget,
)
try:
# Note: Setting check_db_only=True bypasses cache and hits DB on every request,
# causing huge latency increase and CPU spikes. Keep check_db_only=False.
user_info = await get_user_object(
user_id=user_id,
prisma_client=prisma_client,
user_api_key_cache=user_api_key_cache,
user_id_upsert=False,
check_db_only=False,
)
except Exception as e:
verbose_logger.debug(
f"[Non-Blocking] Prometheus: Error getting user info: {str(e)}"
)
return user_object

if user_info:
user_object.budget_reset_at = user_info.budget_reset_at
if user_object.max_budget is None and user_info.max_budget is not None:
user_object.max_budget = user_info.max_budget

user_object.budget_reset_at = budget_reset_at
return user_object

def _set_user_budget_metrics(
Expand Down Expand Up @@ -3372,6 +3331,16 @@ def _set_user_budget_metrics(
)
)

@staticmethod
def _parse_budget_reset_at(value: Optional[str]) -> Optional[datetime]:
"""Parse an ISO datetime string from request metadata into a datetime object."""
if not value:
return None
try:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
except (ValueError, AttributeError):
return None

def _get_remaining_hours_for_budget_reset(self, budget_reset_at: datetime) -> float:
"""
Get remaining hours for budget reset
Expand Down Expand Up @@ -3405,10 +3374,10 @@ def initialize_budget_metrics_cron_job(scheduler: AsyncIOScheduler):
from litellm.constants import PROMETHEUS_BUDGET_METRICS_REFRESH_INTERVAL_MINUTES
from litellm.integrations.custom_logger import CustomLogger

prometheus_loggers: List[
CustomLogger
] = litellm.logging_callback_manager.get_custom_loggers_for_type(
callback_type=PrometheusLogger
prometheus_loggers: List[CustomLogger] = (
litellm.logging_callback_manager.get_custom_loggers_for_type(
callback_type=PrometheusLogger
)
)
# we need to get the initialized prometheus logger instance(s) and call logger.initialize_remaining_budget_metrics() on them
verbose_logger.debug("found %s prometheus loggers", len(prometheus_loggers))
Expand Down
Loading
Loading