Skip to content

Feature/DB Backed Registry#371

Draft
canelbirlik wants to merge 13 commits intofix/registry/versioning-simplificationfrom
feature/registry-db-backed-backend
Draft

Feature/DB Backed Registry#371
canelbirlik wants to merge 13 commits intofix/registry/versioning-simplificationfrom
feature/registry-db-backed-backend

Conversation

@canelbirlik
Copy link
Contributor

@canelbirlik canelbirlik commented Feb 23, 2026

Overall

Adds a hybrid DB+GCP registry backend.
1- Catalogue operation are performed on DB
2- Inline saving: If object size < inline_threshold, serializes/saves to db instead of GCS.

DB:

  • Added methods for db:
  • update_one: with upsert and optional get prev or next, helps with atomic fetching. prev version while updating(push_single)
  • delete_one,delete_many: delete on canonical ids, not doc id.
  • Creates a dedicated event loop for sync calls so we don't create+close one on async.run.
  • Redis: WIP, added natural key(nk) support for now

Profiling:

10 objects, 1 file each, 2mb per file.

===========================================================
  DB+GCP Hybrid (Mongo)  (10 objects)
============================================================

  Operation                            Time (ms)
  ----------------------------------- ----------
  save(batch x10)                         1804.5
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1   1762.0
    │   ├─ gcs.upload_batch                    10    950.7
    │   ├─ mongo.obj_meta.update_one           10      0.8
    │   ├─ mongo.commit_plan.insert_one        10     13.0
    │   └─ mongo.commit_plan.delete_one        10      0.6
    └─ archiver.save                           10      0.7
  save(skip x10)                            45.7
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1      4.8
    │   └─ mongo.obj_meta.find                 10      2.8
    └─ archiver.save                           10      0.6
  save(overwrite x10)                     2339.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1   2292.9
    │   ├─ gcs.upload_batch                    10    899.4
    │   ├─ gcs.delete_batch                    10    251.6
    │   ├─ mongo.obj_meta.update_one           10      0.8
    │   ├─ mongo.commit_plan.insert_one        10      5.4
    │   └─ mongo.commit_plan.delete_one        10      0.6
    └─ archiver.save                           10      0.7
  load(batch x10)                         1618.4
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.pull                             1   1395.7
    │   ├─ gcs.download_batch                   1   1395.5
    │   └─ mongo.obj_meta.find                 11      0.5
    └─ backend.fetch_metadata                   1      0.5
  list_objects()                             1.1
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.distinct                  1      1.0
  list_versions()                            0.0
  has_object(hit)                            0.7
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.find                      1      0.7
  has_object(miss)                           0.4
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.find                      1      0.4
  __len__                                    0.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.distinct                  1      0.3
  keys()                                     0.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.distinct                  1      0.3
  register_materializer()                    0.6
  registered_materializers()                 0.7
  delete(batch x10)                        600.4
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ backend.fetch_metadata                   1      0.6
        ├─ gcs.delete_batch                    10    270.0
        ├─ mongo.obj_meta.find                  1      0.6
        ├─ mongo.obj_meta.delete_one           10      2.5
        ├─ mongo.commit_plan.insert_one        10      2.9
        └─ mongo.commit_plan.delete_one        10      1.0
  list_objects(empty)                        0.6
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ mongo.obj_meta.distinct                  1      0.6
  ─────────────────────────────────── ──────────
  TOTAL                                   6413.1

vs GCP Backend:

  ----------------------------------- ----------
  save(batch x10)                         3280.0
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1   2920.9
    │   └─ gcs.upload_batch                    10    935.4
    ├─ backend.fetch_metadata                  10    100.9
    └─ archiver.save                           10      0.6
  save(skip x10)                           270.5
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1    229.4
    ├─ backend.fetch_metadata                  10    117.1
    └─ archiver.save                           10      0.6
  save(overwrite x10)                     4107.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.push                             1   4066.5
    │   ├─ gcs.upload_batch                    10    882.6
    │   └─ gcs.delete_batch                    10    255.7
    ├─ backend.fetch_metadata                  10    108.1
    └─ archiver.save                           10      0.6
  load(batch x10)                         2713.2
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    ├─ backend.pull                             1   1254.4
    │   ├─ gcs.download_batch                   1   1254.2
    │   └─ gcs.list_objects                    10    118.8
    └─ backend.fetch_metadata                   1    229.1
  list_objects()                            98.9
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ gcs.list_objects                         1     98.8
  list_versions()                            0.0
  has_object(hit)                          105.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ backend.fetch_metadata                   1    105.3
  has_object(miss)                         108.4
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ gcs.list_objects                         1    108.4
  __len__                                  105.5
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ gcs.list_objects                         1    105.5
  keys()                                    99.7
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ gcs.list_objects                         1     99.7
  register_materializer()                  972.5
  registered_materializers()               117.1
  delete(batch x10)                       2221.0
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ backend.fetch_metadata                   1    227.4
        └─ gcs.delete_batch                    10    272.3
  list_objects(empty)                      104.3
    stage                                  calls    avg/ms
    ────────────────────────────────────── ─────  ────────
    └─ gcs.list_objects                         1    104.3
  ─────────────────────────────────── ──────────
  TOTAL                                  14303.9

import logging
import os
import tempfile
import uuid
from pathlib import Path

logging.disable(logging.WARNING)

from mindtrace.core.config import CoreConfig
from mindtrace.registry import Registry
from mindtrace.registry.backends.gcp_db_registry_backend import GCPDBRegistryBackend


def make_sample_files(base: Path, label: str = "default"):
    """Create a fake model directory with a couple of files."""
    base.mkdir(parents=True, exist_ok=True)
    (base / "weights.bin").write_bytes(os.urandom(1024))
    (base / "config.json").write_text(f'{{"layers": 12, "label": "{label}"}}')
    return base


def print_loaded_files(dest: Path):
    """Print loaded file names and contents."""
    for f in sorted(dest.rglob("*"), key=lambda p: p.name):
        if f.is_file():
            try:
                print(f"     {f.name}: {f.read_text()}")
            except UnicodeDecodeError:
                print(f"     {f.name}: <binary, {f.stat().st_size} bytes>")


def main():
    cfg = CoreConfig()
    gcp = cfg.MINDTRACE_GCP
    gcp_reg = cfg.MINDTRACE_GCP_REGISTRY

    run_id = uuid.uuid4().hex[:8]
    prefix = f"example-db-registry-{run_id}"

    db_uri = "mongodb://ip:port"
    db_name = "example_registry_db"

    print(f"Run ID: {run_id}")
    print(f"Backend: MongoDB ({db_uri}/{db_name})")
    print(f"GCS: gs://{gcp_reg.GCP_BUCKET_NAME}/{prefix}")
    print()

    # ── Create backend ───────────────────────────────────────────────────────
    backend = GCPDBRegistryBackend(
        project_id=gcp.GCP_PROJECT_ID,
        bucket_name=gcp_reg.GCP_BUCKET_NAME,
        credentials_path=gcp.GCP_CREDENTIALS_PATH,
        prefix=prefix,
        db_uri=db_uri,
        db_name=db_name,
        preferred_db_backend="mongo",
        allow_index_dropping=True,
    )

    registry = Registry(backend=backend, use_cache=False, version_objects=True, mutable=True)

    with tempfile.TemporaryDirectory() as tmp:
        # ── 1. Single save ───────────────────────────────────────────────────
        print("1. Single save:")
        model_v1 = make_sample_files(Path(tmp) / "model_v1", label="v1")
        registry.save("example:model", model_v1, version="1.0.0")
        print("   Saved example:model@1.0.0")

        # ── 2. Batch save (multiple objects in one call) ─────────────────────
        print("\n2. Batch save:")
        dirs = [make_sample_files(Path(tmp) / f"batch_{i}", label=f"batch-{i}") for i in range(3)]
        names = ["example:model_a", "example:model_b", "example:model_c"]
        versions = ["1.0.0", "1.0.0", "1.0.0"]
        registry.save(names, dirs, version=versions)
        print(f"   Saved {names} @ {versions}")

        # ── 3. Skip on conflict ──────────────────────────────────────────────
        print("\n3. Skip on conflict:")
        from mindtrace.registry.core.exceptions import RegistryVersionConflict
        try:
            registry.save("example:model", model_v1, version="1.0.0", on_conflict="skip")
        except RegistryVersionConflict:
            print("   Single save → RegistryVersionConflict (expected)")
        # Batch save silently skips duplicates
        result = registry.save(["example:model"], [model_v1], version=["1.0.0"], on_conflict="skip")
        print(f"   Batch save  → skipped silently: {result}")

        # ── 4. Catalogue queries (all MongoDB — no GCS scans) ────────────────
        print("\n4. Catalogue queries (all hit MongoDB, zero GCS calls):")
        print(f"   list_objects()      → {registry.list_objects()}")
        print(f"   list_versions()     → {registry.list_versions('example:model')}")
        print(f"   has_object(1.0.0)   → {registry.has_object('example:model', '1.0.0')}")
        print(f"   has_object(9.9.9)   → {registry.has_object('example:model', '9.9.9')}")
        print(f"   len(registry)       → {len(registry)}")
        print(f"   keys()              → {registry.keys()}")

        # ── 5. Single load ───────────────────────────────────────────────────
        print("\n5. Single load:")
        dest = Path(tmp) / "loaded_single"
        registry.load("example:model", version="1.0.0", output_dir=str(dest))
        print(f"   Loaded example:model@1.0.0:")
        print_loaded_files(dest)

        # ── 6. Batch load ────────────────────────────────────────────────────
        print("\n6. Batch load:")
        dest_batch = Path(tmp) / "loaded_batch"
        registry.load(names, version=versions, output_dir=str(dest_batch))
        print(f"   Loaded {names}:")
        print_loaded_files(dest_batch)

        # ── 7. Overwrite ─────────────────────────────────────────────────────
        print("\n7. Overwrite (mutable=True):")
        model_v1_updated = make_sample_files(Path(tmp) / "model_v1_updated", label="v1-updated")
        (model_v1_updated / "readme.txt").write_text("updated model")
        registry.save("example:model", model_v1_updated, version="1.0.0", on_conflict="overwrite")
        dest_ow = Path(tmp) / "loaded_overwritten"
        registry.load("example:model", version="1.0.0", output_dir=str(dest_ow))
        print("   After overwrite:")
        print_loaded_files(dest_ow)

        # ── 8. Materializers ─────────────────────────────────────────────────
        print("\n8. Materializers (stored in dedicated MongoDB collection):")
        registry.register_materializer("example.Model", "example.ModelMaterializer")
        registry.register_materializer("example.Tokenizer", "example.TokenizerMaterializer")
        print(f"   registered_materializers() → {registry.registered_materializers()}")

        # ── 9. Delete ────────────────────────────────────────────────────────
        print("\n9. Delete:")
        registry.delete(
            ["example:model", "example:model_a", "example:model_b", "example:model_c"],
            ["1.0.0", "1.0.0", "1.0.0", "1.0.0"],
        )
        print(f"   Objects after delete → {registry.list_objects()}")

if __name__ == "__main__":
    main()

TODOS:

  • Add Redis methods
  • Tests for both DB methods, Integration tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant