Conversation
…ation When a model endpoint is created via POST /v1/llm/model-endpoints with source=HUGGING_FACE and no checkpoint_path, ModelWeightsManager now automatically checks the configured S3/GCS/ABS prefix for cached weights and downloads from HuggingFace Hub + uploads if missing — eliminating the manual sync_model_weights.py step. - Add ModelWeightsManager with ensure_model_weights_available() (async-safe via run_in_executor, cache-hit skips all I/O) - Add upload_files() abstract method to LLMArtifactGateway with implementations for S3, GCS, and ABS - Wire ModelWeightsManager into CreateLLMModelEndpointV1UseCase and the create_model_endpoint API handler - Fix huggingface_hub.utils._errors import for hub>=0.36 compatibility - Add unit tests covering cache hit/miss, path construction, and end-to-end integration with CreateLLMModelEndpointV1UseCase Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| The remote path (s3://, gs://, or https://) where the weights are stored. | ||
| """ | ||
| remote_path = self._get_remote_path(hf_repo) | ||
| files = self.llm_artifact_gateway.list_files(remote_path) |
There was a problem hiding this comment.
list_files() blocks the event loop
snapshot_download and upload_files are correctly offloaded via run_in_executor, but list_files() on line 46 is a synchronous I/O call (S3 ListObjects / GCS list_blobs / ABS list_blob_names) that runs directly on the async event loop. For consistency with the other two calls, this should also be wrapped in run_in_executor:
| files = self.llm_artifact_gateway.list_files(remote_path) | |
| files = await loop.run_in_executor( | |
| None, | |
| functools.partial(self.llm_artifact_gateway.list_files, remote_path), | |
| ) |
Note: loop would need to be obtained before this line — move the loop = asyncio.get_event_loop() line above this call.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/domain/use_cases/model_weights_manager.py
Line: 46
Comment:
**`list_files()` blocks the event loop**
`snapshot_download` and `upload_files` are correctly offloaded via `run_in_executor`, but `list_files()` on line 46 is a synchronous I/O call (S3 `ListObjects` / GCS `list_blobs` / ABS `list_blob_names`) that runs directly on the async event loop. For consistency with the other two calls, this should also be wrapped in `run_in_executor`:
```suggestion
files = await loop.run_in_executor(
None,
functools.partial(self.llm_artifact_gateway.list_files, remote_path),
)
```
Note: `loop` would need to be obtained before this line — move the `loop = asyncio.get_event_loop()` line above this call.
How can I resolve this? If you propose a fix, please make it concise.| logger.info(f"Cache miss for {hf_repo}. Downloading from HuggingFace Hub...") | ||
| loop = asyncio.get_event_loop() | ||
| with tempfile.TemporaryDirectory() as tmp_dir: | ||
| await loop.run_in_executor( | ||
| None, | ||
| functools.partial( | ||
| snapshot_download, | ||
| repo_id=hf_repo, | ||
| local_dir=tmp_dir, | ||
| ignore_patterns=HF_IGNORE_PATTERNS, | ||
| ), | ||
| ) | ||
| await loop.run_in_executor( | ||
| None, | ||
| functools.partial( | ||
| self.llm_artifact_gateway.upload_files, | ||
| tmp_dir, | ||
| remote_path, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Unhandled errors from snapshot_download will crash endpoint creation
If snapshot_download fails (e.g., gated model requiring HF auth token, rate limiting, network timeout), the exception propagates uncaught and returns a 500 to the caller. Many models in SUPPORTED_MODELS_INFO (like meta-llama/*) are gated and require authentication. Consider wrapping this in a try/except that logs the error and either raises a user-friendly error or falls back to checkpoint_path = None (allowing downstream logic to handle it):
try:
await loop.run_in_executor(...)
await loop.run_in_executor(...)
except Exception as e:
logger.error(f"Failed to download/upload weights for {hf_repo}: {e}")
raise ObjectHasInvalidValueException(
f"Could not download model weights for {hf_repo}. "
"Ensure the model is accessible and try again, or provide a checkpoint_path explicitly."
)Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/domain/use_cases/model_weights_manager.py
Line: 51-70
Comment:
**Unhandled errors from `snapshot_download` will crash endpoint creation**
If `snapshot_download` fails (e.g., gated model requiring HF auth token, rate limiting, network timeout), the exception propagates uncaught and returns a 500 to the caller. Many models in `SUPPORTED_MODELS_INFO` (like `meta-llama/*`) are gated and require authentication. Consider wrapping this in a try/except that logs the error and either raises a user-friendly error or falls back to `checkpoint_path = None` (allowing downstream logic to handle it):
```python
try:
await loop.run_in_executor(...)
await loop.run_in_executor(...)
except Exception as e:
logger.error(f"Failed to download/upload weights for {hf_repo}: {e}")
raise ObjectHasInvalidValueException(
f"Could not download model weights for {hf_repo}. "
"Ensure the model is accessible and try again, or provide a checkpoint_path explicitly."
)
```
How can I resolve this? If you propose a fix, please make it concise.ensure_model_weights_available is now synchronous — it returns the expected checkpoint path immediately and fires a background asyncio task to sync weights from HuggingFace Hub. An init container is injected into the K8s deployment to poll storage until the weights are present before the main container starts. LLMMetadata gains an hf_weights_syncing flag to signal this flow downstream. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| _HF_WEIGHTS_POLL_SCRIPT = """\ | ||
| import boto3, os, sys, time | ||
| from urllib.parse import urlparse | ||
|
|
||
| cp = os.environ["CHECKPOINT_PATH"] | ||
| url = urlparse(cp) | ||
| bucket = url.netloc | ||
| prefix = url.path.lstrip("/") | ||
| s3 = boto3.client("s3") | ||
| while True: | ||
| resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) | ||
| if resp.get("Contents"): | ||
| print(f"Model weights ready at {cp}", flush=True) | ||
| sys.exit(0) | ||
| print(f"Waiting for model weights at {cp}...", flush=True) | ||
| time.sleep(30) | ||
| """ |
There was a problem hiding this comment.
Init container poll script only supports S3
_HF_WEIGHTS_POLL_SCRIPT hardcodes boto3 and s3.list_objects_v2, but the system supports three storage backends: S3, GCS (gs://), and Azure Blob Storage (https://*.blob.core.windows.net). If hf_user_fine_tuned_weights_prefix points to GCS or ABS, this script will fail at runtime — urlparse("gs://bucket/key") gives netloc="bucket" but the S3 API call will raise an error.
The upload_files implementations in GCSLLMArtifactGateway and ABSLLMArtifactGateway correctly handle their respective backends, so the background sync will succeed, but the init container waiting for the weights will never detect them.
Consider either:
- Dispatching based on the URL scheme (
s3://→ boto3,gs://→google.cloud.storage,https://*.blob.core.windows.net→ azure SDK), or - Using a simpler polling mechanism (e.g., an HTTP HEAD request if the gateway can provide a presigned/public URL).
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py
Line: 70-86
Comment:
**Init container poll script only supports S3**
`_HF_WEIGHTS_POLL_SCRIPT` hardcodes `boto3` and `s3.list_objects_v2`, but the system supports three storage backends: S3, GCS (`gs://`), and Azure Blob Storage (`https://*.blob.core.windows.net`). If `hf_user_fine_tuned_weights_prefix` points to GCS or ABS, this script will fail at runtime — `urlparse("gs://bucket/key")` gives `netloc="bucket"` but the S3 API call will raise an error.
The `upload_files` implementations in `GCSLLMArtifactGateway` and `ABSLLMArtifactGateway` correctly handle their respective backends, so the background sync will succeed, but the init container waiting for the weights will never detect them.
Consider either:
- Dispatching based on the URL scheme (`s3://` → boto3, `gs://` → `google.cloud.storage`, `https://*.blob.core.windows.net` → azure SDK), or
- Using a simpler polling mechanism (e.g., an HTTP HEAD request if the gateway can provide a presigned/public URL).
How can I resolve this? If you propose a fix, please make it concise.| while True: | ||
| resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) | ||
| if resp.get("Contents"): | ||
| print(f"Model weights ready at {cp}", flush=True) | ||
| sys.exit(0) | ||
| print(f"Waiting for model weights at {cp}...", flush=True) | ||
| time.sleep(30) |
There was a problem hiding this comment.
No timeout — pod hangs forever if weight sync fails
The while True loop has no timeout or maximum iteration count. If the background _sync_weights task fails (gated HF model, network error, permission issue, disk full), the init container will poll indefinitely, blocking pod startup forever. Since the background task is fire-and-forget with no error propagation back to the caller, there's no signal to the init container that it should stop.
Consider adding a maximum wait time (e.g., 1-2 hours) after which the container exits with a non-zero status and a descriptive error message:
MAX_WAIT_SECONDS = 7200 # 2 hours
elapsed = 0
while elapsed < MAX_WAIT_SECONDS:
...
time.sleep(30)
elapsed += 30
print(f"Timed out waiting for model weights at {cp}", flush=True)
sys.exit(1)Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py
Line: 79-85
Comment:
**No timeout — pod hangs forever if weight sync fails**
The `while True` loop has no timeout or maximum iteration count. If the background `_sync_weights` task fails (gated HF model, network error, permission issue, disk full), the init container will poll indefinitely, blocking pod startup forever. Since the background task is fire-and-forget with no error propagation back to the caller, there's no signal to the init container that it should stop.
Consider adding a maximum wait time (e.g., 1-2 hours) after which the container exits with a non-zero status and a descriptive error message:
```python
MAX_WAIT_SECONDS = 7200 # 2 hours
elapsed = 0
while elapsed < MAX_WAIT_SECONDS:
...
time.sleep(30)
elapsed += 30
print(f"Timed out waiting for model weights at {cp}", flush=True)
sys.exit(1)
```
How can I resolve this? If you propose a fix, please make it concise.| checkpoint_files = self.llm_artifact_gateway.list_files(checkpoint_path) | ||
| validate_checkpoint_files(checkpoint_files) | ||
| if checkpoint_files: | ||
| validate_checkpoint_files(checkpoint_files) |
There was a problem hiding this comment.
Skipping validation silently changes error behavior for non-sync callers
Previously, validate_checkpoint_files was always called and would raise ObjectHasInvalidValueException("No safetensors found in the checkpoint path.") if the checkpoint contained no safetensors — catching user misconfiguration early. Now, when list_files returns an empty list (e.g., a wrong/empty S3 prefix that isn't related to weight syncing), validation is silently skipped and the endpoint creation proceeds, only failing later at inference time.
Consider guarding this skip more tightly — e.g., only skip when a hf_weights_syncing flag is passed through, rather than universally skipping for any empty file list:
if checkpoint_files:
validate_checkpoint_files(checkpoint_files)
elif not hf_weights_syncing:
raise ObjectHasInvalidValueException(
f"No files found at checkpoint path: {checkpoint_path}"
)Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py
Line: 654-656
Comment:
**Skipping validation silently changes error behavior for non-sync callers**
Previously, `validate_checkpoint_files` was always called and would raise `ObjectHasInvalidValueException("No safetensors found in the checkpoint path.")` if the checkpoint contained no safetensors — catching user misconfiguration early. Now, when `list_files` returns an empty list (e.g., a wrong/empty S3 prefix that isn't related to weight syncing), validation is silently skipped and the endpoint creation proceeds, only failing later at inference time.
Consider guarding this skip more tightly — e.g., only skip when a `hf_weights_syncing` flag is passed through, rather than universally skipping for any empty file list:
```python
if checkpoint_files:
validate_checkpoint_files(checkpoint_files)
elif not hf_weights_syncing:
raise ObjectHasInvalidValueException(
f"No files found at checkpoint path: {checkpoint_path}"
)
```
How can I resolve this? If you propose a fix, please make it concise.- Hold a strong set reference to each asyncio.Task to prevent GC cancellation - Deduplicate concurrent sync requests for the same hf_repo via _in_progress dict - Surface task exceptions via logger.error in _on_task_done callback - Store ModelWeightsManager as app.state singleton so state persists across requests - Add recover_hf_syncs startup handler to re-trigger syncs after server restart Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| "FROM endpoints " | ||
| "WHERE (endpoint_metadata->'_llm'->>'hf_weights_syncing')::boolean = true" |
There was a problem hiding this comment.
Raw SQL query missing schema prefix
The endpoints table lives in the hosted_model_inference schema (see Endpoint.__table_args__ in db/models/hosted_model_inference.py), but this query references FROM endpoints without the schema. Unless the database role's search_path includes hosted_model_inference (which is not configured in db/base.py), this will fail at runtime with relation "endpoints" does not exist.
| "FROM endpoints " | |
| "WHERE (endpoint_metadata->'_llm'->>'hf_weights_syncing')::boolean = true" | |
| "SELECT DISTINCT endpoint_metadata->'_llm'->>'model_name' AS model_name " | |
| "FROM hosted_model_inference.endpoints " | |
| "WHERE (endpoint_metadata->'_llm'->>'hf_weights_syncing')::boolean = true" |
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/api/app.py
Line: 350-351
Comment:
**Raw SQL query missing schema prefix**
The `endpoints` table lives in the `hosted_model_inference` schema (see `Endpoint.__table_args__` in `db/models/hosted_model_inference.py`), but this query references `FROM endpoints` without the schema. Unless the database role's `search_path` includes `hosted_model_inference` (which is not configured in `db/base.py`), this will fail at runtime with `relation "endpoints" does not exist`.
```suggestion
"SELECT DISTINCT endpoint_metadata->'_llm'->>'model_name' AS model_name "
"FROM hosted_model_inference.endpoints "
"WHERE (endpoint_metadata->'_llm'->>'hf_weights_syncing')::boolean = true"
```
How can I resolve this? If you propose a fix, please make it concise.| quantize: Optional[Quantization] = None | ||
| checkpoint_path: Optional[str] = None | ||
| chat_template_override: Optional[str] = None | ||
| hf_weights_syncing: bool = False |
There was a problem hiding this comment.
hf_weights_syncing is never cleared after sync completes
Once set to True during endpoint creation, no code path ever sets this back to False in the endpoint metadata after the background sync finishes. ModelWeightsManager._on_task_done only cleans up in-memory tracking — it doesn't update the database.
This causes two problems:
recover_hf_syncsre-triggers downloads on every server restart for all endpoints that ever had this flag set, even if their weights were successfully synced long ago.- The init container (
add_hf_weights_init_container) will be added on every subsequent deployment/update of these endpoints, adding unnecessary startup latency even when weights are already present.
Consider adding a callback in _on_task_done (on success) that updates the endpoint metadata to set hf_weights_syncing: false, or have the recover_hf_syncs startup handler check whether the weights are actually present before re-triggering.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/domain/entities/llm_entity.py
Line: 34
Comment:
**`hf_weights_syncing` is never cleared after sync completes**
Once set to `True` during endpoint creation, no code path ever sets this back to `False` in the endpoint metadata after the background sync finishes. `ModelWeightsManager._on_task_done` only cleans up in-memory tracking — it doesn't update the database.
This causes two problems:
1. `recover_hf_syncs` re-triggers downloads on **every server restart** for all endpoints that ever had this flag set, even if their weights were successfully synced long ago.
2. The init container (`add_hf_weights_init_container`) will be added on every subsequent deployment/update of these endpoints, adding unnecessary startup latency even when weights are already present.
Consider adding a callback in `_on_task_done` (on success) that updates the endpoint metadata to set `hf_weights_syncing: false`, or have the `recover_hf_syncs` startup handler check whether the weights are actually present before re-triggering.
How can I resolve this? If you propose a fix, please make it concise.The endpoints table lives in hosted_model_inference schema; bare 'FROM endpoints' would fail at runtime. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| # Resolve checkpoint path: fires background sync and returns expected path immediately | ||
| checkpoint_path = request.checkpoint_path | ||
| hf_weights_syncing = False | ||
| if ( | ||
| checkpoint_path is None | ||
| and request.source == LLMSource.HUGGING_FACE | ||
| and self.model_weights_manager is not None | ||
| ): | ||
| models_info = SUPPORTED_MODELS_INFO.get(request.model_name) | ||
| if models_info and models_info.hf_repo: | ||
| checkpoint_path = self.model_weights_manager.ensure_model_weights_available( | ||
| models_info.hf_repo | ||
| ) | ||
| hf_weights_syncing = True |
There was a problem hiding this comment.
hf_weights_syncing set unconditionally on cache hit
ensure_model_weights_available returns the path immediately without checking whether weights are already cached (the cache check happens asynchronously in the background task). This means hf_weights_syncing is always set to True — even when the weights are already present in storage.
Consequence: on every HF endpoint creation (even with cached weights), the init container in add_hf_weights_init_container is added to the pod spec. The init container will exit quickly on cache hit (since it polls S3 and finds the files), but it still adds startup latency from the init container spin-up and S3 ListObjects call.
Consider making ensure_model_weights_available synchronously check the cache and return a flag indicating whether syncing is actually needed, so hf_weights_syncing is only True when the background task is genuinely downloading:
checkpoint_path, needs_sync = self.model_weights_manager.ensure_model_weights_available(
models_info.hf_repo
)
hf_weights_syncing = needs_syncPrompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/domain/use_cases/llm_model_endpoint_use_cases.py
Line: 1393-1406
Comment:
**`hf_weights_syncing` set unconditionally on cache hit**
`ensure_model_weights_available` returns the path immediately without checking whether weights are already cached (the cache check happens asynchronously in the background task). This means `hf_weights_syncing` is always set to `True` — even when the weights are already present in storage.
Consequence: on every HF endpoint creation (even with cached weights), the init container in `add_hf_weights_init_container` is added to the pod spec. The init container will exit quickly on cache hit (since it polls S3 and finds the files), but it still adds startup latency from the init container spin-up and S3 `ListObjects` call.
Consider making `ensure_model_weights_available` synchronously check the cache and return a flag indicating whether syncing is actually needed, so `hf_weights_syncing` is only `True` when the background task is genuinely downloading:
```python
checkpoint_path, needs_sync = self.model_weights_manager.ensure_model_weights_available(
models_info.hf_repo
)
hf_weights_syncing = needs_sync
```
How can I resolve this? If you propose a fix, please make it concise.| async def recover_hf_syncs(): | ||
| """Re-trigger weight syncs for endpoints that were syncing when server last stopped.""" | ||
| from model_engine_server.db.base import get_session_async | ||
| from model_engine_server.infra.repositories.live_tokenizer_repository import ( | ||
| SUPPORTED_MODELS_INFO, | ||
| ) | ||
| from sqlalchemy import text | ||
|
|
||
| session_factory = get_session_async() | ||
| try: | ||
| async with session_factory() as session: | ||
| result = await session.execute( | ||
| text( | ||
| "SELECT DISTINCT endpoint_metadata->'_llm'->>'model_name' AS model_name " | ||
| "FROM hosted_model_inference.endpoints " | ||
| "WHERE (endpoint_metadata->'_llm'->>'hf_weights_syncing')::boolean = true" | ||
| ) | ||
| ) | ||
| model_names = [row.model_name for row in result if row.model_name] | ||
| except Exception: | ||
| logger.warning("Could not query pending HF sync endpoints at startup", exc_info=True) | ||
| return | ||
| for model_name in model_names: | ||
| info = SUPPORTED_MODELS_INFO.get(model_name) | ||
| if info and info.hf_repo: | ||
| app.state.model_weights_manager.ensure_model_weights_available(info.hf_repo) | ||
| logger.info(f"Startup: re-triggered HF weight sync for {model_name}") |
There was a problem hiding this comment.
recover_hf_syncs re-triggers syncs on every restart, indefinitely
Since hf_weights_syncing is never cleared to false after a successful sync (the _on_task_done callback only cleans up in-memory state, not the DB), this startup handler will re-trigger downloads for every endpoint that has ever had hf_weights_syncing=true — even if their weights were successfully synced long ago.
This means on every server restart:
- Unnecessary background
snapshot_download+upload_filestasks are spawned for already-synced models (the cache-hit path returns quickly, but still issues alist_filesI/O call per model). - The init container (
add_hf_weights_init_container) will continue to be added to every subsequent deployment of these endpoints, adding startup latency even when weights are present.
Consider either:
- Adding a step in
_on_task_done(on success) that updates the DB to sethf_weights_syncing: false, or - Having this handler check whether weights are actually present (via
list_files) before re-triggering the sync.
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/api/app.py
Line: 336-362
Comment:
**`recover_hf_syncs` re-triggers syncs on every restart, indefinitely**
Since `hf_weights_syncing` is never cleared to `false` after a successful sync (the `_on_task_done` callback only cleans up in-memory state, not the DB), this startup handler will re-trigger downloads for **every** endpoint that has ever had `hf_weights_syncing=true` — even if their weights were successfully synced long ago.
This means on every server restart:
1. Unnecessary background `snapshot_download` + `upload_files` tasks are spawned for already-synced models (the cache-hit path returns quickly, but still issues a `list_files` I/O call per model).
2. The init container (`add_hf_weights_init_container`) will continue to be added to every subsequent deployment of these endpoints, adding startup latency even when weights are present.
Consider either:
- Adding a step in `_on_task_done` (on success) that updates the DB to set `hf_weights_syncing: false`, or
- Having this handler check whether weights are actually present (via `list_files`) before re-triggering the sync.
How can I resolve this? If you propose a fix, please make it concise.| _HF_WEIGHTS_POLL_SCRIPT = """\ | ||
| import boto3, os, sys, time | ||
| from urllib.parse import urlparse | ||
|
|
||
| cp = os.environ["CHECKPOINT_PATH"] | ||
| url = urlparse(cp) | ||
| bucket = url.netloc | ||
| prefix = url.path.lstrip("/") | ||
| s3 = boto3.client("s3") | ||
| while True: | ||
| resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, MaxKeys=1) | ||
| if resp.get("Contents"): | ||
| print(f"Model weights ready at {cp}", flush=True) | ||
| sys.exit(0) | ||
| print(f"Waiting for model weights at {cp}...", flush=True) | ||
| time.sleep(30) | ||
| """ |
There was a problem hiding this comment.
Init container poll script hardcodes S3 — fails on GCS/Azure
_HF_WEIGHTS_POLL_SCRIPT imports boto3 and calls s3.list_objects_v2, but the system supports three storage backends (S3, GCS, Azure Blob Storage). The upload_files implementations for GCS and ABS correctly handle their respective backends, so the background sync will succeed — but the init container will never detect the uploaded weights on non-S3 backends and will poll indefinitely.
Additionally, the while True loop has no timeout or maximum iteration count. If the background sync task fails for any reason (gated HF model, network error, permissions), the init container blocks pod startup forever.
Consider:
- Dispatching on URL scheme (
s3://→ boto3,gs://→google.cloud.storage,https://*.blob.core.windows.net→ azure SDK) - Adding a maximum wait time (e.g., 1–2 hours) after which the init container exits non-zero with an error message
Prompt To Fix With AI
This is a comment left during a code review.
Path: model-engine/model_engine_server/infra/gateways/resources/k8s_endpoint_resource_delegate.py
Line: 70-86
Comment:
**Init container poll script hardcodes S3 — fails on GCS/Azure**
`_HF_WEIGHTS_POLL_SCRIPT` imports `boto3` and calls `s3.list_objects_v2`, but the system supports three storage backends (S3, GCS, Azure Blob Storage). The `upload_files` implementations for GCS and ABS correctly handle their respective backends, so the background sync will succeed — but the init container will never detect the uploaded weights on non-S3 backends and will poll indefinitely.
Additionally, the `while True` loop has no timeout or maximum iteration count. If the background sync task fails for any reason (gated HF model, network error, permissions), the init container blocks pod startup forever.
Consider:
1. Dispatching on URL scheme (`s3://` → boto3, `gs://` → `google.cloud.storage`, `https://*.blob.core.windows.net` → azure SDK)
2. Adding a maximum wait time (e.g., 1–2 hours) after which the init container exits non-zero with an error message
How can I resolve this? If you propose a fix, please make it concise.
Summary
ModelWeightsManagerclass that automatically checks for cached model weights in S3/GCS/ABS and downloads from HuggingFace Hub if missing, eliminating the need to manually runsync_model_weights.pybefore creating an endpointupload_files()toLLMArtifactGateway(abstract) with implementations for S3, GCS, and ABSModelWeightsManagerintoCreateLLMModelEndpointV1UseCase— triggers only whencheckpoint_path is Noneandsource == HUGGING_FACEhuggingface_hub.utils._errorsimport compatibility for hub >= 0.36How it works
Both HF download and S3 upload run in
run_in_executorto avoid blocking the async event loop.Test plan
test_cache_hit_skips_download— no download/upload on cache hittest_cache_hit_returns_correct_s3_path— correct{prefix}/{hf_repo}path returnedtest_cache_miss_calls_snapshot_download_and_upload— HF download + upload called with correct argstest_s3_path_construction— trailing slash stripped correctly from prefixtest_create_llm_model_endpoint_calls_weights_manager_on_hf_source— weights manager invoked from use case, resolved path forwarded to bundle creationpytest tests/unit/domain/test_model_weights_manager.py)🤖 Generated with Claude Code
Greptile Summary
Adds a
ModelWeightsManagerthat automatically downloads model weights from HuggingFace Hub and uploads them to cloud storage (S3/GCS/ABS) in the background when creating an endpoint withcheckpoint_path=Noneandsource=HUGGING_FACE. This eliminates the need to manually runsync_model_weights.pybefore endpoint creation. The PR also addsupload_files()to theLLMArtifactGatewayabstract class with implementations for all three cloud backends, wires the manager into the endpoint creation use case, adds a K8s init container to poll for weight availability, and fixeshuggingface_hubimport compatibility for hub >= 0.36.hf_weights_syncingflag is never cleared in the database after a successful sync, causingrecover_hf_syncsto re-trigger downloads on every server restart for all endpoints that ever used this feature_HF_WEIGHTS_POLL_SCRIPT) hardcodesboto3/S3 API calls, so it will fail at runtime for GCS and Azure deployments despite the upload logic supporting all three backendsupload_files()implementations and test coverage are solid across all three cloud providersConfidence Score: 2/5
k8s_endpoint_resource_delegate.py(S3-only init container with no timeout) andapp.py(perpetual re-trigger inrecover_hf_syncsdue to hf_weights_syncing never being cleared).Important Files Changed
Sequence Diagram
sequenceDiagram participant Client participant API as POST /v1/llm/model-endpoints participant UseCase as CreateLLMModelEndpointV1UseCase participant MWM as ModelWeightsManager participant BG as Background Task participant Storage as S3/GCS/ABS participant HF as HuggingFace Hub participant K8s as K8s Init Container Client->>API: Create endpoint (no checkpoint_path) API->>UseCase: execute(request) UseCase->>MWM: ensure_model_weights_available(hf_repo) MWM-->>UseCase: remote_path (immediate return) MWM->>BG: asyncio.create_task(_sync_weights) UseCase->>UseCase: Create bundle + endpoint (hf_weights_syncing=true) UseCase-->>Client: 200 OK (endpoint created) Note over BG,Storage: Background (non-blocking) BG->>Storage: list_files(remote_path) alt Cache hit Storage-->>BG: files[] (non-empty) BG->>BG: return early else Cache miss Storage-->>BG: [] (empty) BG->>HF: snapshot_download(hf_repo) HF-->>BG: local files BG->>Storage: upload_files(local, remote) end Note over K8s,Storage: Pod startup K8s->>Storage: Poll list_objects (every 30s) Storage-->>K8s: files found K8s->>K8s: exit(0) — main container startsLast reviewed commit: 3a18aee