Skip to content

Commit 3d3dfd0

Browse files
Speed up json serialization with orjson and custom FastAPI responses (#2880)
* Use orjson * Use CustomORJSONResponse for runs API * refactor: wrap fleet API responses with CustomORJSONResponse * feat: add response_model to decorators and remove return type annotations Co-authored-by: aider (bedrock/us.anthropic.claude-sonnet-4-20250514-v1:0) <aider@aider.chat> * refactor: Consistently use CustomORJSONResponse and response_model in router files Co-authored-by: aider (bedrock/us.anthropic.claude-sonnet-4-20250514-v1:0) <aider@aider.chat> * Replace validators with dict() for serialization tweaks * Fix linting * Fix asyncpg.pgproto.pgproto.UUID serialization * Fix extra import * Use orjson with indentation for json schema * Fix generate-json-schema CI * Fix import --------- Co-authored-by: aider (bedrock/us.anthropic.claude-sonnet-4-20250514-v1:0) <aider@aider.chat>
1 parent ab5dfbf commit 3d3dfd0

File tree

29 files changed

+559
-350
lines changed

29 files changed

+559
-350
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,8 @@ jobs:
240240
run: uv sync
241241
- name: Generate json schema
242242
run: |
243-
uv run python -c "from dstack._internal.core.models.configurations import DstackConfiguration; print(DstackConfiguration.schema_json(indent=2))" > configuration.json
244-
uv run python -c "from dstack._internal.core.models.profiles import ProfilesConfig; print(ProfilesConfig.schema_json(indent=2))" > profiles.json
243+
uv run python -c "from dstack._internal.core.models.configurations import DstackConfiguration; print(DstackConfiguration.schema_json())" > configuration.json
244+
uv run python -c "from dstack._internal.core.models.profiles import ProfilesConfig; print(ProfilesConfig.schema_json())" > profiles.json
245245
- name: Upload json schema to S3
246246
run: |
247247
VERSION=$((${{ github.run_number }} + ${{ env.BUILD_INCREMENT }}))

.github/workflows/release.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ jobs:
311311
run: uv sync
312312
- name: Generate json schema
313313
run: |
314-
uv run python -c "from dstack._internal.core.models.configurations import DstackConfiguration; print(DstackConfiguration.schema_json(indent=2))" > configuration.json
315-
uv run python -c "from dstack._internal.core.models.profiles import ProfilesConfig; print(ProfilesConfig.schema_json(indent=2))" > profiles.json
314+
uv run python -c "from dstack._internal.core.models.configurations import DstackConfiguration; print(DstackConfiguration.schema_json())" > configuration.json
315+
uv run python -c "from dstack._internal.core.models.profiles import ProfilesConfig; print(ProfilesConfig.schema_json())" > profiles.json
316316
- name: Upload json schema to S3
317317
run: |
318318
VERSION=${GITHUB_REF#refs/tags/}

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies = [
3535
"gpuhunt==0.1.6",
3636
"argcomplete>=3.5.0",
3737
"ignore-python>=0.2.0",
38+
"orjson",
3839
]
3940

4041
[project.urls]

src/dstack/_internal/core/models/common.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import re
22
from enum import Enum
3-
from typing import Union
3+
from typing import Any, Callable, Optional, Union
44

5+
import orjson
56
from pydantic import Field
67
from pydantic_duality import DualBaseModel
78
from typing_extensions import Annotated
89

10+
from dstack._internal.utils.json_utils import pydantic_orjson_dumps
11+
912
IncludeExcludeFieldType = Union[int, str]
1013
IncludeExcludeSetType = set[IncludeExcludeFieldType]
1114
IncludeExcludeDictType = dict[
@@ -20,7 +23,40 @@
2023
# This allows to use the same model both for a strict parsing of the user input and
2124
# for a permissive parsing of the server responses.
2225
class CoreModel(DualBaseModel):
23-
pass
26+
class Config:
27+
json_loads = orjson.loads
28+
json_dumps = pydantic_orjson_dumps
29+
30+
def json(
31+
self,
32+
*,
33+
include: Optional[IncludeExcludeType] = None,
34+
exclude: Optional[IncludeExcludeType] = None,
35+
by_alias: bool = False,
36+
skip_defaults: Optional[bool] = None, # ignore as it's deprecated
37+
exclude_unset: bool = False,
38+
exclude_defaults: bool = False,
39+
exclude_none: bool = False,
40+
encoder: Optional[Callable[[Any], Any]] = None,
41+
models_as_dict: bool = True, # does not seems to be needed by dstack or dependencies
42+
**dumps_kwargs: Any,
43+
) -> str:
44+
"""
45+
Override `json()` method so that it calls `dict()`.
46+
Allows changing how models are serialized by overriding `dict()` only.
47+
By default, `json()` won't call `dict()`, so changes applied in `dict()` won't take place.
48+
"""
49+
data = self.dict(
50+
by_alias=by_alias,
51+
include=include,
52+
exclude=exclude,
53+
exclude_unset=exclude_unset,
54+
exclude_defaults=exclude_defaults,
55+
exclude_none=exclude_none,
56+
)
57+
if self.__custom_root_type__:
58+
data = data["__root__"]
59+
return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
2460

2561

2662
class Duration(int):

src/dstack/_internal/core/models/configurations.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import PurePosixPath
55
from typing import Any, Dict, List, Optional, Union
66

7+
import orjson
78
from pydantic import Field, ValidationError, conint, constr, root_validator, validator
89
from typing_extensions import Annotated, Literal
910

@@ -18,6 +19,9 @@
1819
from dstack._internal.core.models.services import AnyModel, OpenAIChatModel
1920
from dstack._internal.core.models.unix import UnixUser
2021
from dstack._internal.core.models.volumes import MountPoint, VolumeConfiguration, parse_mount_point
22+
from dstack._internal.utils.json_utils import (
23+
pydantic_orjson_dumps_with_indent,
24+
)
2125

2226
CommandsList = List[str]
2327
ValidPort = conint(gt=0, le=65536)
@@ -573,6 +577,9 @@ class DstackConfiguration(CoreModel):
573577
]
574578

575579
class Config:
580+
json_loads = orjson.loads
581+
json_dumps = pydantic_orjson_dumps_with_indent
582+
576583
@staticmethod
577584
def schema_extra(schema: Dict[str, Any]):
578585
schema["$schema"] = "http://json-schema.org/draft-07/schema#"

src/dstack/_internal/core/models/profiles.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from enum import Enum
22
from typing import Any, Dict, List, Optional, Union, overload
33

4+
import orjson
45
from pydantic import Field, root_validator, validator
56
from typing_extensions import Annotated, Literal
67

78
from dstack._internal.core.models.backends.base import BackendType
89
from dstack._internal.core.models.common import CoreModel, Duration
910
from dstack._internal.utils.common import list_enum_values_for_annotation
11+
from dstack._internal.utils.json_utils import pydantic_orjson_dumps_with_indent
1012
from dstack._internal.utils.tags import tags_validator
1113

1214
DEFAULT_RETRY_DURATION = 3600
@@ -343,6 +345,9 @@ class ProfilesConfig(CoreModel):
343345
profiles: List[Profile]
344346

345347
class Config:
348+
json_loads = orjson.loads
349+
json_dumps = pydantic_orjson_dumps_with_indent
350+
346351
schema_extra = {"$schema": "http://json-schema.org/draft-07/schema#"}
347352

348353
def default(self) -> Optional[Profile]:

src/dstack/_internal/core/models/resources.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -382,14 +382,6 @@ def schema_extra(schema: Dict[str, Any]):
382382
gpu: Annotated[Optional[GPUSpec], Field(description="The GPU requirements")] = None
383383
disk: Annotated[Optional[DiskSpec], Field(description="The disk resources")] = DEFAULT_DISK
384384

385-
# TODO: Remove in 0.20. Added for backward compatibility.
386-
@root_validator
387-
def _post_validate(cls, values):
388-
cpu = values.get("cpu")
389-
if isinstance(cpu, CPUSpec) and cpu.arch in [None, gpuhunt.CPUArchitecture.X86]:
390-
values["cpu"] = cpu.count
391-
return values
392-
393385
def pretty_format(self) -> str:
394386
# TODO: Remove in 0.20. Use self.cpu directly
395387
cpu = parse_obj_as(CPUSpec, self.cpu)
@@ -407,3 +399,18 @@ def pretty_format(self) -> str:
407399
resources.update(disk_size=self.disk.size)
408400
res = pretty_resources(**resources)
409401
return res
402+
403+
def dict(self, *args, **kwargs) -> Dict:
404+
# super() does not work with pydantic-duality
405+
res = CoreModel.dict(self, *args, **kwargs)
406+
self._update_serialized_cpu(res)
407+
return res
408+
409+
# TODO: Remove in 0.20. Added for backward compatibility.
410+
def _update_serialized_cpu(self, values: Dict):
411+
cpu = values["cpu"]
412+
if cpu:
413+
arch = cpu.get("arch")
414+
count = cpu.get("count")
415+
if count and arch in [None, gpuhunt.CPUArchitecture.X86.value]:
416+
values["cpu"] = count

src/dstack/_internal/core/models/runs.py

Lines changed: 69 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -325,56 +325,45 @@ def duration(self) -> timedelta:
325325
end_time = self.finished_at
326326
return end_time - self.submitted_at
327327

328-
@root_validator
329-
def _status_message(cls, values) -> Dict:
330-
try:
331-
status = values["status"]
332-
termination_reason = values["termination_reason"]
333-
exit_code = values["exit_status"]
334-
except KeyError:
335-
return values
336-
values["status_message"] = JobSubmission._get_status_message(
337-
status=status,
338-
termination_reason=termination_reason,
339-
exit_status=exit_code,
340-
)
341-
return values
328+
def dict(self, *args, **kwargs) -> Dict:
329+
status_message = self._get_status_message()
330+
error = self._get_error()
331+
# super() does not work with pydantic-duality
332+
res = CoreModel.dict(self, *args, **kwargs)
333+
res["status_message"] = status_message
334+
res["error"] = error
335+
return res
342336

343-
@staticmethod
344-
def _get_status_message(
345-
status: JobStatus,
346-
termination_reason: Optional[JobTerminationReason],
347-
exit_status: Optional[int],
348-
) -> str:
349-
if status == JobStatus.DONE:
337+
def _get_status_message(self) -> Optional[str]:
338+
if self.status == JobStatus.DONE:
350339
return "exited (0)"
351-
elif status == JobStatus.FAILED:
352-
if termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
353-
return f"exited ({exit_status})"
354-
elif termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY:
340+
elif self.status == JobStatus.FAILED:
341+
if self.termination_reason == JobTerminationReason.CONTAINER_EXITED_WITH_ERROR:
342+
return f"exited ({self.exit_status})"
343+
elif (
344+
self.termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
345+
):
355346
return "no offers"
356-
elif termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
347+
elif self.termination_reason == JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY:
357348
return "interrupted"
358349
else:
359350
return "error"
360-
elif status == JobStatus.TERMINATED:
361-
if termination_reason == JobTerminationReason.TERMINATED_BY_USER:
351+
elif self.status == JobStatus.TERMINATED:
352+
if self.termination_reason == JobTerminationReason.TERMINATED_BY_USER:
362353
return "stopped"
363-
elif termination_reason == JobTerminationReason.ABORTED_BY_USER:
354+
elif self.termination_reason == JobTerminationReason.ABORTED_BY_USER:
364355
return "aborted"
365-
return status.value
356+
return self.status.value
366357

367-
@root_validator
368-
def _error(cls, values) -> Dict:
369-
try:
370-
termination_reason = values["termination_reason"]
371-
except KeyError:
372-
return values
373-
values["error"] = JobSubmission._get_error(termination_reason=termination_reason)
374-
return values
358+
def _get_error(self) -> Optional[str]:
359+
return JobSubmission._termination_reason_to_error(
360+
termination_reason=self.termination_reason
361+
)
375362

376363
@staticmethod
377-
def _get_error(termination_reason: Optional[JobTerminationReason]) -> Optional[str]:
364+
def _termination_reason_to_error(
365+
termination_reason: Optional[JobTerminationReason],
366+
) -> Optional[str]:
378367
error_mapping = {
379368
JobTerminationReason.INSTANCE_UNREACHABLE: "instance unreachable",
380369
JobTerminationReason.WAITING_INSTANCE_LIMIT_EXCEEDED: "waiting instance limit exceeded",
@@ -395,6 +384,12 @@ class Job(CoreModel):
395384
job_spec: JobSpec
396385
job_submissions: List[JobSubmission]
397386

387+
def get_last_termination_reason(self) -> Optional[JobTerminationReason]:
388+
for submission in reversed(self.job_submissions):
389+
if submission.termination_reason is not None:
390+
return submission.termination_reason
391+
return None
392+
398393

399394
class RunSpec(CoreModel):
400395
# TODO: run_name, working_dir are redundant here since they already passed in configuration
@@ -525,87 +520,70 @@ class Run(CoreModel):
525520
last_processed_at: datetime
526521
status: RunStatus
527522
status_message: Optional[str] = None
528-
termination_reason: Optional[RunTerminationReason]
523+
termination_reason: Optional[RunTerminationReason] = None
529524
run_spec: RunSpec
530525
jobs: List[Job]
531-
latest_job_submission: Optional[JobSubmission]
526+
latest_job_submission: Optional[JobSubmission] = None
532527
cost: float = 0
533528
service: Optional[ServiceSpec] = None
534529
deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers
535530
# TODO: make error a computed field after migrating to pydanticV2
536531
error: Optional[str] = None
537532
deleted: Optional[bool] = None
538533

539-
@root_validator
540-
def _error(cls, values) -> Dict:
541-
try:
542-
termination_reason = values["termination_reason"]
543-
except KeyError:
544-
return values
545-
values["error"] = Run._get_error(termination_reason=termination_reason)
546-
return values
534+
def dict(self, *args, **kwargs) -> Dict:
535+
status_message = self._get_status_message()
536+
error = self._get_error()
537+
# super() does not work with pydantic-duality
538+
res = CoreModel.dict(self, *args, **kwargs)
539+
res["status_message"] = status_message
540+
res["error"] = error
541+
return res
542+
543+
def _get_error(self) -> Optional[str]:
544+
return Run._termination_reason_to_error(termination_reason=self.termination_reason)
547545

548546
@staticmethod
549-
def _get_error(termination_reason: Optional[RunTerminationReason]) -> Optional[str]:
547+
def _termination_reason_to_error(
548+
termination_reason: Optional[RunTerminationReason],
549+
) -> Optional[str]:
550550
if termination_reason == RunTerminationReason.RETRY_LIMIT_EXCEEDED:
551551
return "retry limit exceeded"
552552
elif termination_reason == RunTerminationReason.SERVER_ERROR:
553553
return "server error"
554554
else:
555555
return None
556556

557-
@root_validator
558-
def _status_message(cls, values) -> Dict:
557+
def _get_status_message(self) -> Optional[str]:
558+
if len(self.jobs) == 0:
559+
return self.status.value
560+
561+
last_job = self.jobs[0]
559562
# FIXME: status_message should not require all job submissions for status calculation
560563
# since it's very expensive and is not required for anything else.
561564
# May return a different status if not all job submissions requested.
562565
# TODO: Calculate status_message by looking at job models directly instead job submissions.
563-
try:
564-
status = values["status"]
565-
jobs: List[Job] = values["jobs"]
566-
retry_on_events = (
567-
jobs[0].job_spec.retry.on_events if jobs and jobs[0].job_spec.retry else []
568-
)
569-
job_status = (
570-
jobs[0].job_submissions[-1].status
571-
if len(jobs) == 1 and jobs[0].job_submissions
572-
else None
573-
)
574-
termination_reason = Run.get_last_termination_reason(jobs[0]) if jobs else None
575-
except KeyError:
576-
return values
577-
values["status_message"] = Run._get_status_message(
578-
status=status,
579-
job_status=job_status,
580-
retry_on_events=retry_on_events,
581-
termination_reason=termination_reason,
582-
)
583-
return values
566+
last_job_termination_reason = last_job.get_last_termination_reason()
584567

585-
@staticmethod
586-
def get_last_termination_reason(job: "Job") -> Optional[JobTerminationReason]:
587-
for submission in reversed(job.job_submissions):
588-
if submission.termination_reason is not None:
589-
return submission.termination_reason
590-
return None
568+
if len(self.jobs) == 1:
569+
# FIXME: Clarify why show "pulling" only in case of one job
570+
if (
571+
last_job.job_submissions
572+
and last_job.job_submissions[-1].status == JobStatus.PULLING
573+
):
574+
return "pulling"
591575

592-
@staticmethod
593-
def _get_status_message(
594-
status: RunStatus,
595-
job_status: Optional[JobStatus],
596-
retry_on_events: List[RetryEvent],
597-
termination_reason: Optional[JobTerminationReason],
598-
) -> str:
599-
if job_status == JobStatus.PULLING:
600-
return "pulling"
576+
retry_on_events = last_job.job_spec.retry.on_events if last_job.job_spec.retry else []
601577
# Currently, `retrying` is shown only for `no-capacity` events
602578
if (
603-
status in [RunStatus.SUBMITTED, RunStatus.PENDING]
604-
and termination_reason == JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
579+
self.status in [RunStatus.SUBMITTED, RunStatus.PENDING]
580+
and last_job_termination_reason
581+
== JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY
605582
and RetryEvent.NO_CAPACITY in retry_on_events
606583
):
607584
return "retrying"
608-
return status.value
585+
586+
return self.status.value
609587

610588
def is_deployment_in_progress(self) -> bool:
611589
return any(

0 commit comments

Comments
 (0)