Skip to content

Commit

Permalink
[serve.llm] remove asyncache and cachetools from dependencies. (#50806)
Browse files Browse the repository at this point in the history
This PR replaces the async cache decorator implementation with some
lightweight internal utility implementation in order to reduce the
dependency depth of the llm image.

---------

Signed-off-by: Gene Su <e870252314@gmail.com>
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
Signed-off-by: khluu <51931015+khluu@users.noreply.github.com>
Co-authored-by: Gene Su <e870252314@gmail.com>
Co-authored-by: khluu <51931015+khluu@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 23, 2025
1 parent d5b51d9 commit 03308ae
Show file tree
Hide file tree
Showing 13 changed files with 477 additions and 70 deletions.
1 change: 0 additions & 1 deletion doc/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,6 @@ def setup(app):
autodoc_mock_imports = [
"aiohttp",
"aiosignal",
"asyncache",
"async_timeout",
"backoff",
"cachetools",
Expand Down
43 changes: 13 additions & 30 deletions python/ray/llm/_internal/serve/deployments/llm/multiplex/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import json
import subprocess
import time
from typing import Any, Dict, List, Optional, Tuple, Union

# TODO (genesu): remove dependency on asyncache.
from asyncache import cached
from cachetools import TLRUCache
from fastapi import HTTPException
from filelock import FileLock

Expand All @@ -17,6 +13,7 @@
get_file_from_s3,
list_subfolders_gcs,
list_subfolders_s3,
remote_object_cache,
)
from ray.llm._internal.serve.configs.server_models import (
LLMConfig,
Expand Down Expand Up @@ -149,21 +146,6 @@ def sync_model(
raise


def _validate_model_ttu(key, value, now):
# Return the expiration time depending on value
# (now + some offset)
# For get_lora_finetuned_context_length, we want to periodically re-check if
# the files are available if they weren't before (because they
# might have been uploaded in the meantime).
# If they were uploaded and we cached the sequence length response,
# then we just need to guard against users deleting the files
# from the bucket, which shouldn't happen often.
if value is CLOUD_OBJECT_MISSING:
return now + CLOUD_OBJECT_MISSING_EXPIRE_S
else:
return now + CLOUD_OBJECT_EXISTS_EXPIRE_S


@make_async
def _get_object_from_cloud(object_uri: str) -> Union[str, object]:
"""Gets an object from the cloud.
Expand Down Expand Up @@ -193,21 +175,22 @@ def _get_object_from_cloud(object_uri: str) -> Union[str, object]:
return body_str


@cached(
cache=TLRUCache(
maxsize=4096,
getsizeof=lambda x: 1,
ttu=_validate_model_ttu,
timer=time.monotonic,
)
@remote_object_cache(
max_size=4096,
missing_expire_seconds=CLOUD_OBJECT_MISSING_EXPIRE_S,
exists_expire_seconds=CLOUD_OBJECT_EXISTS_EXPIRE_S,
missing_object_value=CLOUD_OBJECT_MISSING,
)
async def get_object_from_cloud(object_uri: str) -> Union[str, object]:
"""Calls _get_object_from_cloud with caching.
"""Gets an object from the cloud with caching.
We separate the caching logic from the implementation, so the
implementation can be faked while testing the caching logic in unit tests.
"""
The cache will store missing objects for a short time and existing objects for
a longer time. This prevents unnecessary cloud API calls when objects don't exist
while ensuring we don't cache missing objects for too long in case they get created.
Returns:
The body of the object if it exists, or CLOUD_OBJECT_MISSING if it doesn't.
"""
return await _get_object_from_cloud(object_uri)


Expand Down
192 changes: 191 additions & 1 deletion python/ray/llm/_internal/serve/deployments/utils/cloud_utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,32 @@
# TODO (genesu): clean up these utils.
from typing import List, Optional, Tuple, Union, Dict
from typing import (
List,
Optional,
Tuple,
Union,
Dict,
Any,
Callable,
Awaitable,
TypeVar,
NamedTuple,
)
import os

# TODO (genesu): remove dependency on boto3. Lazy import in the functions.
import boto3
import requests
import subprocess
import time
import inspect
import asyncio

from ray.llm._internal.serve.observability.logging import get_logger
from ray.llm._internal.serve.configs.server_models import S3AWSCredentials


T = TypeVar("T")

logger = get_logger(__name__)

AWS_EXECUTABLE = "aws"
Expand Down Expand Up @@ -537,3 +554,176 @@ def get_aws_credentials(

env = resp.json()
return env


class CacheEntry(NamedTuple):
value: Any
expire_time: Optional[float]


class CloudObjectCache:
"""A cache that works with both sync and async fetch functions.
The purpose of this data structure is to cache the result of a function call
usually used to fetch a value from a cloud object store.
The idea is this:
- Cloud operations are expensive
- In LoRA specifically, we would fetch remote storage to download the model weights
at each request.
- If the same model is requested many times, we don't want to inflate the time to first token.
- We control the cache via not only the least recently used eviction policy, but also
by expiring cache entries after a certain time.
- If the object is missing, we cache the missing status for a small duration while if
the object exists, we cache the object for a longer duration.
"""

def __init__(
self,
max_size: int,
fetch_fn: Union[Callable[[str], Any], Callable[[str], Awaitable[Any]]],
missing_expire_seconds: Optional[int] = None,
exists_expire_seconds: Optional[int] = None,
missing_object_value: Any = object(),
):
"""Initialize the cache.
Args:
max_size: Maximum number of items to store in cache
fetch_fn: Function to fetch values (can be sync or async)
missing_expire_seconds: How long to cache missing objects (None for no expiration)
exists_expire_seconds: How long to cache existing objects (None for no expiration)
"""
self._cache: Dict[str, CacheEntry] = {}
self._max_size = max_size
self._fetch_fn = fetch_fn
self._missing_expire_seconds = missing_expire_seconds
self._exists_expire_seconds = exists_expire_seconds
self._is_async = inspect.iscoroutinefunction(fetch_fn) or (
callable(fetch_fn) and inspect.iscoroutinefunction(fetch_fn.__call__)
)
self._missing_object_value = missing_object_value
# Lock for thread-safe cache access
self._lock = asyncio.Lock()

async def aget(self, key: str) -> Any:
"""Async get value from cache or fetch it if needed."""

if not self._is_async:
raise ValueError("Cannot use async get() with sync fetch function")

async with self._lock:
value, should_fetch = self._check_cache(key)
if not should_fetch:
return value

# Fetch new value
value = await self._fetch_fn(key)
self._update_cache(key, value)
return value

def get(self, key: str) -> Any:
"""Sync get value from cache or fetch it if needed."""
if self._is_async:
raise ValueError("Cannot use sync get() with async fetch function")

# For sync access, we use a simple check-then-act pattern
# This is safe because sync functions are not used in async context
value, should_fetch = self._check_cache(key)
if not should_fetch:
return value

# Fetch new value
value = self._fetch_fn(key)
self._update_cache(key, value)
return value

def _check_cache(self, key: str) -> tuple[Any, bool]:
"""Check if key exists in cache and is valid.
Returns:
Tuple of (value, should_fetch)
where should_fetch is True if we need to fetch a new value
"""
now = time.monotonic()

if key in self._cache:
value, expire_time = self._cache[key]
if expire_time is None or now < expire_time:
return value, False

return None, True

def _update_cache(self, key: str, value: Any) -> None:
"""Update cache with new value."""
now = time.monotonic()

# Calculate expiration
expire_time = None
if (
self._missing_expire_seconds is not None
or self._exists_expire_seconds is not None
):
if value is self._missing_object_value:
expire_time = (
now + self._missing_expire_seconds
if self._missing_expire_seconds
else None
)
else:
expire_time = (
now + self._exists_expire_seconds
if self._exists_expire_seconds
else None
)

# Enforce size limit by removing oldest entry if needed
# This is an O(n) operation but it's fine since the cache size is usually small.
if len(self._cache) >= self._max_size:
oldest_key = min(
self._cache, key=lambda k: self._cache[k].expire_time or float("inf")
)
del self._cache[oldest_key]

self._cache[key] = CacheEntry(value, expire_time)

def __len__(self) -> int:
return len(self._cache)


def remote_object_cache(
max_size: int,
missing_expire_seconds: Optional[int] = None,
exists_expire_seconds: Optional[int] = None,
missing_object_value: Any = None,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""A decorator that provides async caching using CloudObjectCache.
This is a direct replacement for the remote_object_cache/cachetools combination,
using CloudObjectCache internally to maintain cache state.
Args:
max_size: Maximum number of items to store in cache
missing_expire_seconds: How long to cache missing objects
exists_expire_seconds: How long to cache existing objects
missing_object_value: Value to use for missing objects
"""

def decorator(func: Callable[..., T]) -> Callable[..., T]:
# Create a single cache instance for this function
cache = CloudObjectCache(
max_size=max_size,
fetch_fn=func,
missing_expire_seconds=missing_expire_seconds,
exists_expire_seconds=exists_expire_seconds,
missing_object_value=missing_object_value,
)

async def wrapper(*args, **kwargs):
# Extract the key from either first positional arg or object_uri kwarg
key = args[0] if args else kwargs.get("object_uri")
return await cache.aget(key)

return wrapper

return decorator
1 change: 1 addition & 0 deletions python/ray/llm/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ py_test_module_list(
"serve/deployments/test_streaming_error_handler.py",
"serve/observability/usage_telemetry/test_usage.py",
"serve/deployments/llm/vllm/test_vllm_engine.py",
"serve/utils/test_cloud_utils.py",
],
data = glob(["serve/**/*.yaml"]),
size = "small",
Expand Down
Loading

0 comments on commit 03308ae

Please sign in to comment.