Skip to content

Commit efd2780

Browse files
committed
add AssetsResolver support [in progress]
1 parent a763cbd commit efd2780

File tree

10 files changed

+459
-98
lines changed

10 files changed

+459
-98
lines changed

alembic_db/versions/0001_assets.py

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
# File: /alembic_db/versions/0001_assets.py
12
"""initial assets schema + per-asset state cache
23
34
Revision ID: 0001_assets
@@ -22,15 +23,12 @@ def upgrade() -> None:
2223
sa.Column("size_bytes", sa.BigInteger(), nullable=False, server_default="0"),
2324
sa.Column("mime_type", sa.String(length=255), nullable=True),
2425
sa.Column("refcount", sa.BigInteger(), nullable=False, server_default="0"),
25-
sa.Column("storage_backend", sa.String(length=32), nullable=False, server_default="fs"),
26-
sa.Column("storage_locator", sa.Text(), nullable=False),
2726
sa.Column("created_at", sa.DateTime(timezone=False), nullable=False),
2827
sa.Column("updated_at", sa.DateTime(timezone=False), nullable=False),
2928
sa.CheckConstraint("size_bytes >= 0", name="ck_assets_size_nonneg"),
3029
sa.CheckConstraint("refcount >= 0", name="ck_assets_refcount_nonneg"),
3130
)
3231
op.create_index("ix_assets_mime_type", "assets", ["mime_type"])
33-
op.create_index("ix_assets_backend_locator", "assets", ["storage_backend", "storage_locator"])
3432

3533
# ASSETS_INFO: user-visible references (mutable metadata)
3634
op.create_table(
@@ -52,11 +50,12 @@ def upgrade() -> None:
5250
op.create_index("ix_assets_info_name", "assets_info", ["name"])
5351
op.create_index("ix_assets_info_created_at", "assets_info", ["created_at"])
5452
op.create_index("ix_assets_info_last_access_time", "assets_info", ["last_access_time"])
53+
op.create_index("ix_assets_info_owner_name", "assets_info", ["owner_id", "name"])
5554

5655
# TAGS: normalized tag vocabulary
5756
op.create_table(
5857
"tags",
59-
sa.Column("name", sa.String(length=128), primary_key=True),
58+
sa.Column("name", sa.String(length=512), primary_key=True),
6059
sa.Column("tag_type", sa.String(length=32), nullable=False, server_default="user"),
6160
sa.CheckConstraint("name = lower(name)", name="ck_tags_lowercase"),
6261
)
@@ -65,8 +64,8 @@ def upgrade() -> None:
6564
# ASSET_INFO_TAGS: many-to-many for tags on AssetInfo
6665
op.create_table(
6766
"asset_info_tags",
68-
sa.Column("asset_info_id", sa.BigInteger(), sa.ForeignKey("assets_info.id", ondelete="CASCADE"), nullable=False),
69-
sa.Column("tag_name", sa.String(length=128), sa.ForeignKey("tags.name", ondelete="RESTRICT"), nullable=False),
67+
sa.Column("asset_info_id", sa.Integer(), sa.ForeignKey("assets_info.id", ondelete="CASCADE"), nullable=False),
68+
sa.Column("tag_name", sa.String(length=512), sa.ForeignKey("tags.name", ondelete="RESTRICT"), nullable=False),
7069
sa.Column("origin", sa.String(length=32), nullable=False, server_default="manual"),
7170
sa.Column("added_by", sa.String(length=128), nullable=True),
7271
sa.Column("added_at", sa.DateTime(timezone=False), nullable=False),
@@ -75,15 +74,15 @@ def upgrade() -> None:
7574
op.create_index("ix_asset_info_tags_tag_name", "asset_info_tags", ["tag_name"])
7675
op.create_index("ix_asset_info_tags_asset_info_id", "asset_info_tags", ["asset_info_id"])
7776

78-
# ASSET_LOCATOR_STATE: 1:1 filesystem metadata(for fast integrity checking) for an Asset records
77+
# ASSET_CACHE_STATE: 1:1 local cache metadata for an Asset
7978
op.create_table(
80-
"asset_locator_state",
79+
"asset_cache_state",
8180
sa.Column("asset_hash", sa.String(length=256), sa.ForeignKey("assets.hash", ondelete="CASCADE"), primary_key=True),
81+
sa.Column("file_path", sa.Text(), nullable=False), # absolute local path to cached file
8282
sa.Column("mtime_ns", sa.BigInteger(), nullable=True),
83-
sa.Column("etag", sa.String(length=256), nullable=True),
84-
sa.Column("last_modified", sa.String(length=128), nullable=True),
85-
sa.CheckConstraint("(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_als_mtime_nonneg"),
83+
sa.CheckConstraint("(mtime_ns IS NULL) OR (mtime_ns >= 0)", name="ck_acs_mtime_nonneg"),
8684
)
85+
op.create_index("ix_asset_cache_state_file_path", "asset_cache_state", ["file_path"])
8786

8887
# ASSET_INFO_META: typed KV projection of user_metadata for filtering/sorting
8988
op.create_table(
@@ -102,6 +101,21 @@ def upgrade() -> None:
102101
op.create_index("ix_asset_info_meta_key_val_num", "asset_info_meta", ["key", "val_num"])
103102
op.create_index("ix_asset_info_meta_key_val_bool", "asset_info_meta", ["key", "val_bool"])
104103

104+
# ASSET_LOCATIONS: remote locations per asset
105+
op.create_table(
106+
"asset_locations",
107+
sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True),
108+
sa.Column("asset_hash", sa.String(length=256), sa.ForeignKey("assets.hash", ondelete="CASCADE"), nullable=False),
109+
sa.Column("provider", sa.String(length=32), nullable=False), # e.g., "gcs"
110+
sa.Column("locator", sa.Text(), nullable=False), # e.g., "gs://bucket/path/to/blob"
111+
sa.Column("expected_size_bytes", sa.BigInteger(), nullable=True),
112+
sa.Column("etag", sa.String(length=256), nullable=True),
113+
sa.Column("last_modified", sa.String(length=128), nullable=True),
114+
sa.UniqueConstraint("asset_hash", "provider", "locator", name="uq_asset_locations_triplet"),
115+
)
116+
op.create_index("ix_asset_locations_hash", "asset_locations", ["asset_hash"])
117+
op.create_index("ix_asset_locations_provider", "asset_locations", ["provider"])
118+
105119
# Tags vocabulary for models
106120
tags_table = sa.table(
107121
"tags",
@@ -143,13 +157,18 @@ def upgrade() -> None:
143157

144158

145159
def downgrade() -> None:
160+
op.drop_index("ix_asset_locations_provider", table_name="asset_locations")
161+
op.drop_index("ix_asset_locations_hash", table_name="asset_locations")
162+
op.drop_table("asset_locations")
163+
146164
op.drop_index("ix_asset_info_meta_key_val_bool", table_name="asset_info_meta")
147165
op.drop_index("ix_asset_info_meta_key_val_num", table_name="asset_info_meta")
148166
op.drop_index("ix_asset_info_meta_key_val_str", table_name="asset_info_meta")
149167
op.drop_index("ix_asset_info_meta_key", table_name="asset_info_meta")
150168
op.drop_table("asset_info_meta")
151169

152-
op.drop_table("asset_locator_state")
170+
op.drop_index("ix_asset_cache_state_file_path", table_name="asset_cache_state")
171+
op.drop_table("asset_cache_state")
153172

154173
op.drop_index("ix_asset_info_tags_asset_info_id", table_name="asset_info_tags")
155174
op.drop_index("ix_asset_info_tags_tag_name", table_name="asset_info_tags")
@@ -159,13 +178,13 @@ def downgrade() -> None:
159178
op.drop_table("tags")
160179

161180
op.drop_constraint("uq_assets_info_hash_owner_name", table_name="assets_info")
181+
op.drop_index("ix_assets_info_owner_name", table_name="assets_info")
162182
op.drop_index("ix_assets_info_last_access_time", table_name="assets_info")
163183
op.drop_index("ix_assets_info_created_at", table_name="assets_info")
164184
op.drop_index("ix_assets_info_name", table_name="assets_info")
165185
op.drop_index("ix_assets_info_asset_hash", table_name="assets_info")
166186
op.drop_index("ix_assets_info_owner_id", table_name="assets_info")
167187
op.drop_table("assets_info")
168188

169-
op.drop_index("ix_assets_backend_locator", table_name="assets")
170189
op.drop_index("ix_assets_mime_type", table_name="assets")
171190
op.drop_table("assets")

app/_assets_helpers.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ def resolve_destination_from_tags(tags: list[str]) -> tuple[str, list[str]]:
105105
if root == "models":
106106
if len(tags) < 2:
107107
raise ValueError("at least two tags required for model asset")
108-
bases = folder_paths.folder_names_and_paths[tags[1]][0]
108+
try:
109+
bases = folder_paths.folder_names_and_paths[tags[1]][0]
110+
except KeyError:
111+
raise ValueError(f"unknown model category '{tags[1]}'")
109112
if not bases:
110113
raise ValueError(f"no base path configured for category '{tags[1]}'")
111114
base_dir = os.path.abspath(bases[0])

app/api/assets_routes.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import folder_paths
1010

1111
from .. import assets_manager, assets_scanner
12-
from . import schemas_in
12+
from . import schemas_in, schemas_out
1313

1414

1515
ROUTES = web.RouteTableDef()
@@ -69,7 +69,7 @@ async def download_asset_content(request: web.Request) -> web.Response:
6969
except FileNotFoundError:
7070
return _error_response(404, "FILE_NOT_FOUND", "Underlying file not found on disk.")
7171

72-
quoted = filename.replace('"', "'")
72+
quoted = (filename or "").replace("\r", "").replace("\n", "").replace('"', "'")
7373
cd = f'{disposition}; filename="{quoted}"; filename*=UTF-8\'\'{urllib.parse.quote(filename)}'
7474

7575
resp = web.FileResponse(abs_path)
@@ -182,6 +182,8 @@ async def upload_asset(request: web.Request) -> web.Response:
182182
client_filename=file_client_name,
183183
)
184184
return web.json_response(created.model_dump(mode="json"), status=201)
185+
except ValueError:
186+
return _error_response(400, "BAD_REQUEST", "Invalid inputs.")
185187
except Exception:
186188
try:
187189
if os.path.exists(tmp_path):
@@ -341,6 +343,7 @@ async def get_asset_scan_status(request: web.Request) -> web.Response:
341343
states = assets_scanner.current_statuses()
342344
if root in {"models", "input", "output"}:
343345
states = [s for s in states.scans if s.root == root] # type: ignore
346+
states = schemas_out.AssetScanStatusResponse(scans=states)
344347
return web.json_response(states.model_dump(mode="json"), status=200)
345348

346349

app/assets_fetcher.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from __future__ import annotations
2+
import asyncio
3+
import os
4+
import tempfile
5+
from typing import Optional
6+
import aiohttp
7+
8+
from .storage.hashing import blake3_hash_sync
9+
from .database.db import create_session
10+
from .database.services import ingest_fs_asset, get_cache_state_by_asset_hash
11+
from .resolvers import resolve_asset
12+
from ._assets_helpers import resolve_destination_from_tags, ensure_within_base
13+
14+
_FETCH_LOCKS: dict[str, asyncio.Lock] = {}
15+
16+
17+
def _sanitize_filename(name: str) -> str:
18+
return os.path.basename((name or "").strip()) or "file"
19+
20+
21+
async def ensure_asset_cached(
22+
asset_hash: str,
23+
*,
24+
preferred_name: Optional[str] = None,
25+
tags_hint: Optional[list[str]] = None,
26+
) -> str:
27+
"""
28+
Ensure there is a verified local file for `asset_hash` in the correct Comfy folder.
29+
Policy:
30+
- Resolver must provide valid tags (root and, for models, category).
31+
- If target path already exists:
32+
* if hash matches -> reuse & ingest
33+
* else -> remove and overwrite with the correct content
34+
"""
35+
lock = _FETCH_LOCKS.setdefault(asset_hash, asyncio.Lock())
36+
async with lock:
37+
# 1) If we already have a state -> trust the path
38+
async with await create_session() as sess:
39+
state = await get_cache_state_by_asset_hash(sess, asset_hash=asset_hash)
40+
if state and os.path.isfile(state.file_path):
41+
return state.file_path
42+
43+
# 2) Resolve remote location + placement hints (must include valid tags)
44+
res = await resolve_asset(asset_hash)
45+
if not res:
46+
raise FileNotFoundError(f"No resolver/locations for {asset_hash}")
47+
48+
placement_tags = tags_hint or res.tags
49+
if not placement_tags:
50+
raise ValueError(f"Resolver did not provide placement tags for {asset_hash}")
51+
52+
name_hint = res.filename or preferred_name or asset_hash.replace(":", "_")
53+
safe_name = _sanitize_filename(name_hint)
54+
55+
# 3) Map tags -> destination (strict: raises if invalid root or models category)
56+
base_dir, subdirs = resolve_destination_from_tags(placement_tags) # may raise
57+
dest_dir = os.path.join(base_dir, *subdirs) if subdirs else base_dir
58+
os.makedirs(dest_dir, exist_ok=True)
59+
60+
final_path = os.path.abspath(os.path.join(dest_dir, safe_name))
61+
ensure_within_base(final_path, base_dir)
62+
63+
# 4) If target path exists, try to reuse; else delete invalid cache
64+
if os.path.exists(final_path) and os.path.isfile(final_path):
65+
existing_digest = blake3_hash_sync(final_path)
66+
if f"blake3:{existing_digest}" == asset_hash:
67+
size_bytes = os.path.getsize(final_path)
68+
mtime_ns = getattr(os.stat(final_path), "st_mtime_ns", int(os.path.getmtime(final_path) * 1_000_000_000))
69+
async with await create_session() as sess:
70+
await ingest_fs_asset(
71+
sess,
72+
asset_hash=asset_hash,
73+
abs_path=final_path,
74+
size_bytes=size_bytes,
75+
mtime_ns=mtime_ns,
76+
mime_type=None,
77+
info_name=None,
78+
tags=(),
79+
)
80+
await sess.commit()
81+
return final_path
82+
else:
83+
# Invalid cache: remove before re-downloading
84+
os.remove(final_path)
85+
86+
# 5) Download to temp next to destination
87+
timeout = aiohttp.ClientTimeout(total=60 * 30)
88+
async with aiohttp.ClientSession(timeout=timeout) as session:
89+
async with session.get(res.download_url, headers=dict(res.headers)) as resp:
90+
resp.raise_for_status()
91+
cl = resp.headers.get("Content-Length")
92+
if res.expected_size and cl and int(cl) != int(res.expected_size):
93+
raise ValueError("server Content-Length does not match expected size")
94+
with tempfile.NamedTemporaryFile("wb", delete=False, dir=dest_dir) as tmp:
95+
tmp_path = tmp.name
96+
async for chunk in resp.content.iter_chunked(8 * 1024 * 1024):
97+
if chunk: # TODO: check do we need "await" here
98+
tmp.write(chunk)
99+
100+
# 6) Verify content hash
101+
digest = blake3_hash_sync(tmp_path)
102+
canonical = f"blake3:{digest}"
103+
if canonical != asset_hash:
104+
try:
105+
os.remove(tmp_path)
106+
finally:
107+
raise ValueError(f"Hash mismatch: expected {asset_hash}, got {canonical}")
108+
109+
# 7) Atomically move into place (we already removed an invalid file if it existed)
110+
if os.path.exists(final_path):
111+
os.remove(final_path)
112+
os.replace(tmp_path, final_path)
113+
114+
# 8) Record identity + cache state
115+
size_bytes = os.path.getsize(final_path)
116+
mtime_ns = getattr(os.stat(final_path), "st_mtime_ns", int(os.path.getmtime(final_path) * 1_000_000_000))
117+
async with await create_session() as sess:
118+
await ingest_fs_asset(
119+
sess,
120+
asset_hash=asset_hash,
121+
abs_path=final_path,
122+
size_bytes=size_bytes,
123+
mtime_ns=mtime_ns,
124+
mime_type=None,
125+
info_name=None,
126+
tags=(),
127+
)
128+
await sess.commit()
129+
130+
return final_path

0 commit comments

Comments
 (0)