Skip to content

Commit 576a018

Browse files
authored
Merge pull request #856 from simvue-io/853-enable-bulk-run-creation
✨ Added batch run creation
2 parents 9c6e588 + 20b71c4 commit 576a018

File tree

9 files changed

+170
-23
lines changed

9 files changed

+170
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- Added ability to include environment variables within metadata for runs.
88
- Added new feature allowing users to log tensors as multidimensional metrics after defining a grid.
99
- Improves checks on `offline.cache` directory specification in config file.
10+
- Added ability to upload multiple runs as a batch via the low level API.
1011

1112
## [v2.1.2](https://github.com/simvue-io/client/releases/tag/v2.1.2) - 2025-06-25
1213

simvue/api/objects/artifact/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ def __init__(
5959
# from the initial creation
6060
self._init_data: dict[str, dict] = {}
6161

62+
@classmethod
63+
def new(cls, *_, **__) -> Self:
64+
raise NotImplementedError
65+
6266
def commit(self) -> None:
6367
"""Not applicable, cannot commit single write artifact."""
6468
self._logger.info("Cannot call method 'commit' on write-once type 'Artifact'")

simvue/api/objects/artifact/file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ def new(
9494
_artifact._init_data = {}
9595

9696
else:
97-
_artifact._init_data = _artifact._post(**_artifact._staging)
97+
_artifact._init_data = _artifact._post_single(**_artifact._staging)
9898
_artifact._staging["url"] = _artifact._init_data["url"]
9999

100100
_artifact._init_data["runs"] = kwargs.get("runs") or {}

simvue/api/objects/artifact/object.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ def new(
114114
file.write(_serialized)
115115

116116
else:
117-
_artifact._init_data = _artifact._post(**_artifact._staging)
117+
_artifact._init_data = _artifact._post_single(**_artifact._staging)
118118
_artifact._staging["url"] = _artifact._init_data["url"]
119119

120120
_artifact._init_data["runs"] = kwargs.get("runs") or {}

simvue/api/objects/base.py

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import msgpack
1919
import pydantic
2020

21+
from collections.abc import Generator
2122
from simvue.utilities import staging_merger
2223
from simvue.config.user import SimvueConfiguration
2324
from simvue.exception import ObjectNotFoundError
@@ -172,6 +173,16 @@ def to_params(self) -> dict[str, str]:
172173
return {"id": self.column, "desc": self.descending}
173174

174175

176+
class VisibilityBatchArgs(pydantic.BaseModel):
177+
tenant: bool | None = None
178+
user: list[str] | None = None
179+
public: bool | None = None
180+
181+
182+
class ObjectBatchArgs(pydantic.BaseModel):
183+
pass
184+
185+
175186
class SimvueObject(abc.ABC):
176187
def __init__(
177188
self,
@@ -361,13 +372,21 @@ def _get_visibility(self) -> dict[str, bool | list[str]]:
361372
return {}
362373

363374
@classmethod
375+
@abc.abstractmethod
364376
def new(cls, **_) -> Self:
365377
pass
366378

379+
@classmethod
380+
def batch_create(
381+
cls, obj_args: ObjectBatchArgs, visibility: VisibilityBatchArgs
382+
) -> Generator[str]:
383+
_, __ = obj_args, visibility
384+
raise NotImplementedError
385+
367386
@classmethod
368387
def ids(
369388
cls, count: int | None = None, offset: int | None = None, **kwargs
370-
) -> typing.Generator[str, None, None]:
389+
) -> Generator[str, None, None]:
371390
"""Retrieve a list of all object identifiers.
372391
373392
Parameters
@@ -402,7 +421,7 @@ def get(
402421
count: pydantic.PositiveInt | None = None,
403422
offset: pydantic.NonNegativeInt | None = None,
404423
**kwargs,
405-
) -> typing.Generator[tuple[str, T | None], None, None]:
424+
) -> Generator[tuple[str, T | None], None, None]:
406425
"""Retrieve items of this object type from the server.
407426
408427
Parameters
@@ -467,7 +486,7 @@ def _get_all_objects(
467486
endpoint: str | None = None,
468487
expected_type: type = dict,
469488
**kwargs,
470-
) -> typing.Generator[dict, None, None]:
489+
) -> Generator[dict, None, None]:
471490
_class_instance = cls(_read_only=True)
472491

473492
# Allow the possibility of paginating a URL that is not the
@@ -514,7 +533,7 @@ def read_only(self, is_read_only: bool) -> None:
514533
if not self._read_only:
515534
self._staging = self._get_local_staged()
516535

517-
def commit(self) -> dict | None:
536+
def commit(self) -> dict | list[dict] | None:
518537
"""Send updates to the server, or if offline, store locally."""
519538
if self._read_only:
520539
raise AttributeError("Cannot commit object in 'read-only' mode")
@@ -526,15 +545,22 @@ def commit(self) -> dict | None:
526545
self._cache()
527546
return
528547

529-
_response: dict | None = None
548+
_response: dict[str, str] | list[dict[str, str]] | None = None
530549

531550
# Initial commit is creation of object
532551
# if staging is empty then we do not need to use PUT
533552
if not self._identifier or self._identifier.startswith("offline_"):
534-
self._logger.debug(
535-
f"Posting from staged data for {self._label} '{self.id}': {self._staging}"
536-
)
537-
_response = self._post(**self._staging)
553+
# If batch upload send as list, else send as dictionary of params
554+
if _batch_commit := self._staging.get("batch"):
555+
self._logger.debug(
556+
f"Posting batched data to server: {len(_batch_commit)} {self._label}s"
557+
)
558+
_response = self._post_batch(batch_data=_batch_commit)
559+
else:
560+
self._logger.debug(
561+
f"Posting from staged data for {self._label} '{self.id}': {self._staging}"
562+
)
563+
_response = self._post_single(**self._staging)
538564
elif self._staging:
539565
self._logger.debug(
540566
f"Pushing updates from staged data for {self._label} '{self.id}': {self._staging}"
@@ -570,11 +596,45 @@ def url(self) -> URL | None:
570596
"""
571597
return None if self._identifier is None else self._base_url / self._identifier
572598

573-
def _post(
599+
def _post_batch(
600+
self,
601+
batch_data: list[ObjectBatchArgs],
602+
) -> list[dict[str, str]]:
603+
_response = sv_post(
604+
url=f"{self._base_url}",
605+
headers=self._headers | {"Content-Type": "application/msgpack"},
606+
params=self._params,
607+
data=batch_data,
608+
is_json=True,
609+
)
610+
611+
if _response.status_code == http.HTTPStatus.FORBIDDEN:
612+
raise RuntimeError(
613+
f"Forbidden: You do not have permission to create object of type '{self._label}'"
614+
)
615+
616+
_json_response = get_json_from_response(
617+
response=_response,
618+
expected_status=[http.HTTPStatus.OK, http.HTTPStatus.CONFLICT],
619+
scenario=f"Creation of multiple {self._label}s",
620+
expected_type=list,
621+
)
622+
623+
if not len(batch_data) == (_n_created := len(_json_response)):
624+
raise RuntimeError(
625+
f"Expected {len(batch_data)} to be created, but only {_n_created} found."
626+
)
627+
628+
self._logger.debug(f"successfully created {_n_created} {self._label}s")
629+
630+
return _json_response
631+
632+
def _post_single(
574633
self, *, is_json: bool = True, data: list | dict | None = None, **kwargs
575-
) -> dict[str, typing.Any]:
634+
) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
576635
if not is_json:
577636
kwargs = msgpack.packb(data or kwargs, use_bin_type=True)
637+
578638
_response = sv_post(
579639
url=f"{self._base_url}",
580640
headers=self._headers | {"Content-Type": "application/msgpack"},
@@ -594,11 +654,6 @@ def _post(
594654
scenario=f"Creation of {self._label}",
595655
)
596656

597-
if isinstance(_json_response, list):
598-
raise RuntimeError(
599-
"Expected dictionary from JSON response but got type list"
600-
)
601-
602657
if _id := _json_response.get("id"):
603658
self._logger.debug("'%s' created successfully", _id)
604659
self._identifier = _id

simvue/api/objects/events.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ def new(
7878
**kwargs,
7979
)
8080

81-
def _post(self, **kwargs) -> dict[str, typing.Any]:
82-
return super()._post(is_json=False, **kwargs)
81+
def _post_single(self, **kwargs) -> dict[str, typing.Any]:
82+
return super()._post_single(is_json=False, **kwargs)
8383

8484
def _put(self, **kwargs) -> dict[str, typing.Any]:
8585
raise NotImplementedError("Method 'put' is not available for type Events")

simvue/api/objects/metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ def names(self, run_ids: list[str]) -> list[str]:
138138
expected_type=list,
139139
)
140140

141-
def _post(self, **kwargs) -> dict[str, typing.Any]:
142-
return super()._post(is_json=False, **kwargs)
141+
def _post_single(self, **kwargs) -> dict[str, typing.Any]:
142+
return super()._post_single(is_json=False, **kwargs)
143143

144144
def delete(self, **kwargs) -> dict[str, typing.Any]:
145145
"""Metrics cannot be deleted"""

simvue/api/objects/run.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
88
"""
99

10+
from collections.abc import Generator, Iterable
1011
import http
1112
import typing
1213
import pydantic
@@ -19,7 +20,15 @@
1920
except ImportError:
2021
from typing_extensions import Self
2122

22-
from .base import SimvueObject, Sort, staging_check, Visibility, write_only
23+
from .base import (
24+
ObjectBatchArgs,
25+
VisibilityBatchArgs,
26+
SimvueObject,
27+
Sort,
28+
staging_check,
29+
Visibility,
30+
write_only,
31+
)
2332
from simvue.api.request import (
2433
get as sv_get,
2534
put as sv_put,
@@ -54,6 +63,18 @@ def check_column(cls, column: str) -> str:
5463
return column
5564

5665

66+
class RunBatchArgs(ObjectBatchArgs):
67+
name: str | None = None
68+
description: str | None = None
69+
tags: list[str] | None = None
70+
metadata: dict[str, str | int | float | bool] | None = None
71+
folder: typing.Annotated[str, pydantic.Field(pattern=FOLDER_REGEX)] | None = None
72+
system: dict[str, typing.Any] | None = None
73+
status: typing.Literal[
74+
"terminated", "created", "failed", "completed", "lost", "running"
75+
] = "created"
76+
77+
5778
class Run(SimvueObject):
5879
"""Class for directly interacting with/creating runs on the server."""
5980

@@ -123,6 +144,50 @@ def new(
123144
**kwargs,
124145
)
125146

147+
@classmethod
148+
@pydantic.validate_call
149+
def batch_create(
150+
cls,
151+
entries: Iterable[RunBatchArgs],
152+
*,
153+
visibility: VisibilityBatchArgs | None = None,
154+
folder: typing.Annotated[str, pydantic.StringConstraints(pattern=FOLDER_REGEX)]
155+
| None = None,
156+
metadata: dict[str, str | int | float | bool] | None = None,
157+
) -> Generator[str]:
158+
"""Create a batch of Runs as a single request.
159+
160+
Parameters
161+
----------
162+
entries : Iterable[RunBatchArgs]
163+
define the runs to be created.
164+
visibility : VisibilityBatchArgs | None, optional
165+
specify visibility options for these runs, default is None.
166+
folder : str, optional
167+
override folder specification for these runs to be a single folder, default None.
168+
metadata : dict[str, int | str | float | bool], optional
169+
override metadata specification for these runs, default None.
170+
171+
Yields
172+
------
173+
str
174+
identifiers for created runs
175+
"""
176+
_data: list[dict[str, object]] = [
177+
entry.model_dump(exclude_none=True)
178+
| (
179+
{"visibility": visibility.model_dump(exclude_none=True)}
180+
if visibility
181+
else {}
182+
)
183+
| ({"folder": folder} if folder else {})
184+
| {"metadata": (entry.metadata or {}) | (metadata or {})}
185+
for entry in entries
186+
]
187+
for entry in Run(batch=_data, _read_only=False).commit() or []:
188+
_id: str = entry["id"]
189+
yield _id
190+
126191
@property
127192
@staging_check
128193
def name(self) -> str:

tests/unit/test_run.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55
import datetime
66
import uuid
7+
from simvue.api.objects.run import RunBatchArgs
78
from simvue.sender import sender
89
from simvue.api.objects import Run, Folder
910
from simvue.client import Client
@@ -187,3 +188,24 @@ def test_run_get_properties() -> None:
187188

188189
if _failed:
189190
raise AssertionError("\n" + "\n\t- ".join(": ".join(i) for i in _failed))
191+
192+
193+
@pytest.mark.api
194+
@pytest.mark.online
195+
def test_batch_run_creation() -> None:
196+
_uuid: str = f"{uuid.uuid4()}".split("-")[0]
197+
_folder: Folder = Folder.new(path=f"/simvue_unit_testing/{_uuid}")
198+
_folder.commit()
199+
_runs = [
200+
RunBatchArgs(name=f"batched_run_{i}")
201+
for i in range(10)
202+
]
203+
_counter: int = 0
204+
for i, _id in enumerate(Run.batch_create(entries=_runs, folder=f"/simvue_unit_testing/{_uuid}", metadata={"batch_id": 0})):
205+
_run = Run(identifier=_id)
206+
assert _run.name == f"batched_run_{i}"
207+
assert _run.metadata["batch_id"] == 0
208+
_counter +=1
209+
assert _counter == 10
210+
with contextlib.suppress(Exception):
211+
_folder.delete(recursive=True, delete_runs=True)

0 commit comments

Comments
 (0)