Skip to content

Commit 5e7bb0c

Browse files
committed
feat(proxy): add pass-through deployment filtering methods
Add dedicated methods to filter and select deployments for pass-through endpoints: - Implement get_available_deployment_for_pass_through() to ensure only deployments with use_in_pass_through=True are considered - Implement async_get_available_deployment_for_pass_through() for async operations - Add _filter_pass_through_deployments() helper method to filter by use_in_pass_through flag - Update vertex pass-through route to use the new dedicated method This ensures pass-through endpoints respect the use_in_pass_through configuration and apply proper load balancing strategy only to configured deployments. Add comprehensive tests to verify filtering and load balancing behavior.
1 parent 5ad09a6 commit 5e7bb0c

File tree

3 files changed

+508
-16
lines changed

3 files changed

+508
-16
lines changed

litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,12 +1585,12 @@ async def _base_vertex_proxy_route(
15851585

15861586
if llm_router:
15871587
try:
1588-
deployment = llm_router.get_available_deployment(model=model_id)
1588+
# Use the dedicated pass-through deployment selection method to automatically filter use_in_pass_through=True
1589+
deployment = llm_router.get_available_deployment_for_pass_through(model=model_id)
15891590
if deployment:
15901591
litellm_params = deployment.get("litellm_params", {})
1591-
if litellm_params.get("use_in_pass_through"):
1592-
vertex_project = litellm_params.get("vertex_project")
1593-
vertex_location = litellm_params.get("vertex_location")
1592+
vertex_project = litellm_params.get("vertex_project")
1593+
vertex_location = litellm_params.get("vertex_location")
15941594
except Exception as e:
15951595
verbose_proxy_logger.debug(
15961596
f"Error getting available deployment for model {model_id}: {e}"

litellm/router.py

Lines changed: 339 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7779,6 +7779,154 @@ async def async_get_available_deployment(
77797779
)
77807780
raise e
77817781

7782+
async def async_get_available_deployment_for_pass_through(
7783+
self,
7784+
model: str,
7785+
request_kwargs: Dict,
7786+
messages: Optional[List[Dict[str, str]]] = None,
7787+
input: Optional[Union[str, List]] = None,
7788+
specific_deployment: Optional[bool] = False,
7789+
):
7790+
"""
7791+
Async version of get_available_deployment_for_pass_through
7792+
7793+
Only returns deployments configured with use_in_pass_through=True
7794+
"""
7795+
try:
7796+
parent_otel_span = _get_parent_otel_span_from_kwargs(request_kwargs)
7797+
7798+
# 1. Execute pre-routing hook
7799+
pre_routing_hook_response = await self.async_pre_routing_hook(
7800+
model=model,
7801+
request_kwargs=request_kwargs,
7802+
messages=messages,
7803+
input=input,
7804+
specific_deployment=specific_deployment,
7805+
)
7806+
if pre_routing_hook_response is not None:
7807+
model = pre_routing_hook_response.model
7808+
messages = pre_routing_hook_response.messages
7809+
7810+
# 2. Get healthy deployments
7811+
healthy_deployments = await self.async_get_healthy_deployments(
7812+
model=model,
7813+
request_kwargs=request_kwargs,
7814+
messages=messages,
7815+
input=input,
7816+
specific_deployment=specific_deployment,
7817+
parent_otel_span=parent_otel_span,
7818+
)
7819+
7820+
# 3. If specific deployment returned, verify if it supports pass-through
7821+
if isinstance(healthy_deployments, dict):
7822+
litellm_params = healthy_deployments.get("litellm_params", {})
7823+
if litellm_params.get("use_in_pass_through"):
7824+
return healthy_deployments
7825+
else:
7826+
raise litellm.BadRequestError(
7827+
message=f"Deployment {healthy_deployments.get('model_info', {}).get('id')} does not support pass-through endpoint (use_in_pass_through=False)",
7828+
model=model,
7829+
llm_provider="",
7830+
)
7831+
7832+
# 4. Filter deployments that support pass-through
7833+
pass_through_deployments = self._filter_pass_through_deployments(
7834+
healthy_deployments=healthy_deployments
7835+
)
7836+
7837+
if len(pass_through_deployments) == 0:
7838+
raise litellm.BadRequestError(
7839+
message=f"Model {model} has no deployments configured with use_in_pass_through=True. Please add use_in_pass_through: true to the deployment configuration",
7840+
model=model,
7841+
llm_provider="",
7842+
)
7843+
7844+
# 5. Apply load balancing strategy
7845+
start_time = time.perf_counter()
7846+
if (
7847+
self.routing_strategy == "usage-based-routing-v2"
7848+
and self.lowesttpm_logger_v2 is not None
7849+
):
7850+
deployment = (
7851+
await self.lowesttpm_logger_v2.async_get_available_deployments(
7852+
model_group=model,
7853+
healthy_deployments=pass_through_deployments, # type: ignore
7854+
messages=messages,
7855+
input=input,
7856+
)
7857+
)
7858+
elif (
7859+
self.routing_strategy == "latency-based-routing"
7860+
and self.lowestlatency_logger is not None
7861+
):
7862+
deployment = (
7863+
await self.lowestlatency_logger.async_get_available_deployments(
7864+
model_group=model,
7865+
healthy_deployments=pass_through_deployments, # type: ignore
7866+
messages=messages,
7867+
input=input,
7868+
request_kwargs=request_kwargs,
7869+
)
7870+
)
7871+
elif self.routing_strategy == "simple-shuffle":
7872+
return simple_shuffle(
7873+
llm_router_instance=self,
7874+
healthy_deployments=pass_through_deployments,
7875+
model=model,
7876+
)
7877+
elif (
7878+
self.routing_strategy == "least-busy"
7879+
and self.leastbusy_logger is not None
7880+
):
7881+
deployment = (
7882+
await self.leastbusy_logger.async_get_available_deployments(
7883+
model_group=model,
7884+
healthy_deployments=pass_through_deployments, # type: ignore
7885+
)
7886+
)
7887+
else:
7888+
deployment = None
7889+
7890+
if deployment is None:
7891+
exception = await async_raise_no_deployment_exception(
7892+
litellm_router_instance=self,
7893+
model=model,
7894+
parent_otel_span=parent_otel_span,
7895+
)
7896+
raise exception
7897+
7898+
verbose_router_logger.info(
7899+
f"async_get_available_deployment_for_pass_through model: {model}, selected deployment: {self.print_deployment(deployment)}"
7900+
)
7901+
7902+
end_time = time.perf_counter()
7903+
_duration = end_time - start_time
7904+
asyncio.create_task(
7905+
self.service_logger_obj.async_service_success_hook(
7906+
service=ServiceTypes.ROUTER,
7907+
duration=_duration,
7908+
call_type="<routing_strategy>.async_get_available_deployments",
7909+
parent_otel_span=parent_otel_span,
7910+
start_time=start_time,
7911+
end_time=end_time,
7912+
)
7913+
)
7914+
7915+
return deployment
7916+
except Exception as e:
7917+
traceback_exception = traceback.format_exc()
7918+
if request_kwargs is not None:
7919+
logging_obj = request_kwargs.get("litellm_logging_obj", None)
7920+
if logging_obj is not None:
7921+
threading.Thread(
7922+
target=logging_obj.failure_handler,
7923+
args=(e, traceback_exception),
7924+
).start()
7925+
asyncio.create_task(
7926+
logging_obj.async_failure_handler(e, traceback_exception) # type: ignore
7927+
)
7928+
raise e
7929+
77827930
async def async_pre_routing_hook(
77837931
self,
77847932
model: str,
@@ -7931,6 +8079,169 @@ def get_available_deployment(
79318079
)
79328080
return deployment
79338081

8082+
def get_available_deployment_for_pass_through(
8083+
self,
8084+
model: str,
8085+
messages: Optional[List[Dict[str, str]]] = None,
8086+
input: Optional[Union[str, List]] = None,
8087+
specific_deployment: Optional[bool] = False,
8088+
request_kwargs: Optional[Dict] = None,
8089+
):
8090+
"""
8091+
Returns deployments available for pass-through endpoints (based on load balancing strategy)
8092+
8093+
Similar to get_available_deployment, but only returns deployments with use_in_pass_through=True
8094+
8095+
Args:
8096+
model: Model name
8097+
messages: Optional list of messages
8098+
input: Optional input data
8099+
specific_deployment: Whether to find a specific deployment
8100+
request_kwargs: Optional request parameters
8101+
8102+
Returns:
8103+
Dict: Selected deployment configuration
8104+
8105+
Raises:
8106+
BadRequestError: If no deployment is configured with use_in_pass_through=True
8107+
RouterRateLimitError: If no pass-through deployments are available
8108+
"""
8109+
# 1. Perform common checks to get healthy deployments list
8110+
model, healthy_deployments = self._common_checks_available_deployment(
8111+
model=model,
8112+
messages=messages,
8113+
input=input,
8114+
specific_deployment=specific_deployment,
8115+
)
8116+
8117+
# 2. If the returned is a specific deployment (Dict), verify and return directly
8118+
if isinstance(healthy_deployments, dict):
8119+
litellm_params = healthy_deployments.get("litellm_params", {})
8120+
if litellm_params.get("use_in_pass_through"):
8121+
return healthy_deployments
8122+
else:
8123+
# Specific deployment does not support pass-through
8124+
raise litellm.BadRequestError(
8125+
message=f"Deployment {healthy_deployments.get('model_info', {}).get('id')} does not support pass-through endpoint (use_in_pass_through=False)",
8126+
model=model,
8127+
llm_provider="",
8128+
)
8129+
8130+
# 3. Filter deployments that support pass-through
8131+
pass_through_deployments = self._filter_pass_through_deployments(
8132+
healthy_deployments=healthy_deployments
8133+
)
8134+
8135+
if len(pass_through_deployments) == 0:
8136+
# No deployments support pass-through
8137+
raise litellm.BadRequestError(
8138+
message=f"Model {model} has no deployment configured with use_in_pass_through=True. Please add use_in_pass_through: true in the deployment configuration",
8139+
model=model,
8140+
llm_provider="",
8141+
)
8142+
8143+
# 4. Apply cooldown filtering
8144+
parent_otel_span: Optional[Span] = _get_parent_otel_span_from_kwargs(
8145+
request_kwargs
8146+
)
8147+
cooldown_deployments = _get_cooldown_deployments(
8148+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8149+
)
8150+
pass_through_deployments = self._filter_cooldown_deployments(
8151+
healthy_deployments=pass_through_deployments,
8152+
cooldown_deployments=cooldown_deployments,
8153+
)
8154+
8155+
# 5. Apply pre-call checks (if enabled)
8156+
if self.enable_pre_call_checks and messages is not None:
8157+
pass_through_deployments = self._pre_call_checks(
8158+
model=model,
8159+
healthy_deployments=pass_through_deployments,
8160+
messages=messages,
8161+
request_kwargs=request_kwargs,
8162+
)
8163+
8164+
if len(pass_through_deployments) == 0:
8165+
model_ids = self.get_model_ids(model_name=model)
8166+
_cooldown_time = self.cooldown_cache.get_min_cooldown(
8167+
model_ids=model_ids, parent_otel_span=parent_otel_span
8168+
)
8169+
_cooldown_list = _get_cooldown_deployments(
8170+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8171+
)
8172+
raise RouterRateLimitError(
8173+
model=model,
8174+
cooldown_time=_cooldown_time,
8175+
enable_pre_call_checks=self.enable_pre_call_checks,
8176+
cooldown_list=_cooldown_list,
8177+
)
8178+
8179+
# 6. Apply load balancing strategy
8180+
if self.routing_strategy == "least-busy" and self.leastbusy_logger is not None:
8181+
deployment = self.leastbusy_logger.get_available_deployments(
8182+
model_group=model, healthy_deployments=pass_through_deployments # type: ignore
8183+
)
8184+
elif self.routing_strategy == "simple-shuffle":
8185+
return simple_shuffle(
8186+
llm_router_instance=self,
8187+
healthy_deployments=pass_through_deployments,
8188+
model=model,
8189+
)
8190+
elif (
8191+
self.routing_strategy == "latency-based-routing"
8192+
and self.lowestlatency_logger is not None
8193+
):
8194+
deployment = self.lowestlatency_logger.get_available_deployments(
8195+
model_group=model,
8196+
healthy_deployments=pass_through_deployments, # type: ignore
8197+
request_kwargs=request_kwargs,
8198+
)
8199+
elif (
8200+
self.routing_strategy == "usage-based-routing"
8201+
and self.lowesttpm_logger is not None
8202+
):
8203+
deployment = self.lowesttpm_logger.get_available_deployments(
8204+
model_group=model,
8205+
healthy_deployments=pass_through_deployments, # type: ignore
8206+
messages=messages,
8207+
input=input,
8208+
)
8209+
elif (
8210+
self.routing_strategy == "usage-based-routing-v2"
8211+
and self.lowesttpm_logger_v2 is not None
8212+
):
8213+
deployment = self.lowesttpm_logger_v2.get_available_deployments(
8214+
model_group=model,
8215+
healthy_deployments=pass_through_deployments, # type: ignore
8216+
messages=messages,
8217+
input=input,
8218+
)
8219+
else:
8220+
deployment = None
8221+
8222+
if deployment is None:
8223+
verbose_router_logger.info(
8224+
f"get_available_deployment_for_pass_through model: {model}, no available deployments"
8225+
)
8226+
model_ids = self.get_model_ids(model_name=model)
8227+
_cooldown_time = self.cooldown_cache.get_min_cooldown(
8228+
model_ids=model_ids, parent_otel_span=parent_otel_span
8229+
)
8230+
_cooldown_list = _get_cooldown_deployments(
8231+
litellm_router_instance=self, parent_otel_span=parent_otel_span
8232+
)
8233+
raise RouterRateLimitError(
8234+
model=model,
8235+
cooldown_time=_cooldown_time,
8236+
enable_pre_call_checks=self.enable_pre_call_checks,
8237+
cooldown_list=_cooldown_list,
8238+
)
8239+
8240+
verbose_router_logger.info(
8241+
f"get_available_deployment_for_pass_through model: {model}, selected deployment: {self.print_deployment(deployment)}"
8242+
)
8243+
return deployment
8244+
79348245
def _filter_cooldown_deployments(
79358246
self, healthy_deployments: List[Dict], cooldown_deployments: List[str]
79368247
) -> List[Dict]:
@@ -7953,6 +8264,34 @@ def _filter_cooldown_deployments(
79538264
if deployment["model_info"]["id"] not in cooldown_set
79548265
]
79558266

8267+
def _filter_pass_through_deployments(
8268+
self, healthy_deployments: List[Dict]
8269+
) -> List[Dict]:
8270+
"""
8271+
Filter out deployments configured with use_in_pass_through=True
8272+
8273+
Args:
8274+
healthy_deployments: List of healthy deployments
8275+
8276+
Returns:
8277+
List[Dict]: Only includes a list of deployments that support pass-through
8278+
"""
8279+
verbose_router_logger.debug(
8280+
f"Filter pass-through deployments from {len(healthy_deployments)} healthy deployments"
8281+
)
8282+
8283+
pass_through_deployments = [
8284+
deployment
8285+
for deployment in healthy_deployments
8286+
if deployment.get("litellm_params", {}).get("use_in_pass_through", False)
8287+
]
8288+
8289+
verbose_router_logger.debug(
8290+
f"Found {len(pass_through_deployments)} deployments with pass-through enabled"
8291+
)
8292+
8293+
return pass_through_deployments
8294+
79568295
def _track_deployment_metrics(
79578296
self, deployment, parent_otel_span: Optional[Span], response=None
79588297
):

0 commit comments

Comments
 (0)