Skip to content

Commit

Permalink
refactor: orchestrator client return algo objects (#1272)
Browse files Browse the repository at this point in the history
In this PR I removed the `Algo` class I introduced at some point and replaced it with `orchestrator.Algo`. It makes mocking easier. The only drawback was that I had to do the query to retrieve the algo asset in the image builder. That's the reason I introduced the `Datastore` class. Doing this it is easier to mock the asset retrieval and open the possibility for another kind of storage without having to change all our code.
  • Loading branch information
AlexandrePicosson authored Aug 30, 2022
1 parent 3144e30 commit 1e81bdf
Show file tree
Hide file tree
Showing 19 changed files with 197 additions and 179 deletions.
2 changes: 2 additions & 0 deletions backend/orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .client import OrchestratorClient as Client
from .resources import Address
from .resources import Algo
from .resources import AssetKind
from .resources import ComputePlan
from .resources import ComputePlanStatus
Expand Down Expand Up @@ -34,4 +35,5 @@
"Client",
"ComputePlan",
"ComputePlanStatus",
"Algo",
)
20 changes: 10 additions & 10 deletions backend/orchestrator/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import time
from copy import deepcopy
from functools import wraps
from typing import Generator

import grpc
import structlog
Expand Down Expand Up @@ -33,6 +34,7 @@
from orchestrator.model_pb2_grpc import ModelServiceStub
from orchestrator.organization_pb2_grpc import OrganizationServiceStub
from orchestrator.performance_pb2_grpc import PerformanceServiceStub
from orchestrator.resources import Algo
from orchestrator.resources import ComputePlan
from orchestrator.resources import ComputePlanStatus
from orchestrator.resources import ComputeTask
Expand Down Expand Up @@ -213,27 +215,25 @@ def update_algo(self, args):
return MessageToDict(data, **CONVERT_SETTINGS)

@grpc_retry
def query_algo(self, key):
def query_algo(self, key) -> Algo:
data = self._algo_client.GetAlgo(algo_pb2.GetAlgoParam(key=key), metadata=self._metadata)
return MessageToDict(data, **CONVERT_SETTINGS)
return Algo.from_grpc(data)

@grpc_retry
def query_algos(self, compute_plan_key=None):
def query_algos(self, compute_plan_key=None) -> Generator[Algo, None, None]:
algo_filter = algo_pb2.AlgoQueryFilter(compute_plan_key=compute_plan_key)
res = []
page_token = "" # nosec
while True:
data = self._algo_client.QueryAlgos(
algo_pb2.QueryAlgosParam(filter=algo_filter, page_token=page_token),
metadata=self._metadata,
)
data = MessageToDict(data, **CONVERT_SETTINGS)
algos = data.get("Algos", [])
page_token = data.get("next_page_token")
res.extend(algos)
if page_token == "" or not algos: # nosec
for algo in data.Algos:
yield Algo.from_grpc(algo)

page_token = data.next_page_token
if page_token == "" or len(data.Algos) == 0: # nosec
break
return res

@grpc_retry
def register_datasamples(self, args):
Expand Down
10 changes: 10 additions & 0 deletions backend/orchestrator/mock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import factory

from .resources import Address
from .resources import Algo
from .resources import ComputePlan
from .resources import ComputePlanStatus
from .resources import ComputeTask
Expand Down Expand Up @@ -61,6 +62,15 @@ class Meta:
opener = factory.SubFactory(AddressFactory)


class AlgoFactory(factory.Factory):
class Meta:
model = Algo

key = factory.Faker("uuid4")
owner = "OrgA"
algorithm = factory.SubFactory(AddressFactory)


class ComputePlanFactory(factory.Factory):
class Meta:
model = ComputePlan
Expand Down
15 changes: 15 additions & 0 deletions backend/orchestrator/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pydantic

from orchestrator import algo_pb2
from orchestrator import common_pb2
from orchestrator import computeplan_pb2
from orchestrator import computetask_pb2
Expand Down Expand Up @@ -99,6 +100,20 @@ def from_grpc(cls, m: datamanager_pb2.DataManager) -> "DataManager":
return cls(key=m.key, opener=Address.from_grpc(m.opener))


class Algo(pydantic.BaseModel):
key: str
owner: str
algorithm: Address

@classmethod
def from_grpc(cls, a: algo_pb2.Algo) -> "Algo":
return cls(
key=a.key,
owner=a.owner,
algorithm=Address.from_grpc(a.algorithm),
)


class ComputeTaskStatus(AutoNameEnum):
STATUS_UNKNOWN = enum.auto()
STATUS_WAITING = enum.auto()
Expand Down
45 changes: 0 additions & 45 deletions backend/substrapp/compute_tasks/algo.py

This file was deleted.

2 changes: 1 addition & 1 deletion backend/substrapp/compute_tasks/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def get_performance_filename(algo_key: str) -> str:


def get_exec_command(ctx: Context) -> list[str]:
entrypoint = ImageEntrypoint.objects.get(algo_checksum=ctx.algo.checksum)
entrypoint = ImageEntrypoint.objects.get(algo_checksum=ctx.algo.algorithm.checksum)

command = entrypoint.entrypoint_json

Expand Down
8 changes: 3 additions & 5 deletions backend/substrapp/compute_tasks/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from django.conf import settings

import orchestrator
from substrapp.compute_tasks.algo import Algo
from substrapp.compute_tasks.compute_pod import ComputePod
from substrapp.compute_tasks.directories import SANDBOX_DIR
from substrapp.compute_tasks.directories import Directories
Expand Down Expand Up @@ -38,7 +37,7 @@ class Context:
_compute_plan: orchestrator.ComputePlan
_input_assets: list[orchestrator.ComputeTaskInputAsset]
_directories: Directories
_algo: Algo
_algo: orchestrator.Algo
_has_chainkeys: bool
_outputs: dict[str, str]

Expand All @@ -48,7 +47,7 @@ def __init__(
task: orchestrator.ComputeTask,
compute_plan: orchestrator.ComputePlan,
input_assets: list[orchestrator.ComputeTaskInputAsset],
algo: Algo,
algo: orchestrator.Algo,
directories: Directories,
has_chainkeys: bool,
):
Expand All @@ -70,7 +69,6 @@ def from_task(cls, channel_name: str, task: orchestrator.ComputeTask):
compute_plan = client.query_compute_plan(compute_plan_key)
input_assets = client.get_task_input_assets(task.key)
algo = client.query_algo(task.algo_key)
algo = Algo(channel_name, algo)

logger.debug("retrieved input assets from orchestrator", input_assets=input_assets)

Expand Down Expand Up @@ -118,7 +116,7 @@ def input_models(self) -> list[orchestrator.Model]:
return [input.model for input in self._input_assets if input.kind == orchestrator.AssetKind.ASSET_MODEL]

@property
def algo(self) -> Algo:
def algo(self) -> orchestrator.Algo:
return self._algo

@property
Expand Down
19 changes: 19 additions & 0 deletions backend/substrapp/compute_tasks/datastore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import orchestrator
import substrapp.clients.organization as organization_client


class Datastore:
def __init__(self, channel: str) -> None:
self.channel = channel

def _get_from_address(self, organization: str, address: orchestrator.Address) -> bytes:
return organization_client.get(
channel=self.channel, organization_id=organization, url=address.uri, checksum=address.checksum
)

def get_algo(self, algo: orchestrator.Algo) -> bytes:
return self._get_from_address(algo.owner, algo.algorithm)


def get_datastore(channel: str) -> Datastore:
return Datastore(channel)
7 changes: 4 additions & 3 deletions backend/substrapp/compute_tasks/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from substrapp.compute_tasks import compute_task as task_utils
from substrapp.compute_tasks import errors as compute_task_errors
from substrapp.compute_tasks import utils
from substrapp.compute_tasks.command import get_exec_command
from substrapp.compute_tasks.compute_pod import ComputePod
from substrapp.compute_tasks.compute_pod import Label
Expand All @@ -37,14 +38,14 @@

@timeit
def execute_compute_task(ctx: Context) -> None:
algo = ctx.algo
channel_name = ctx.channel_name
container_image_tag = utils.container_image_tag_from_algo(ctx.algo)

compute_pod = ctx.get_compute_pod(algo.key)
compute_pod = ctx.get_compute_pod(container_image_tag)
pod_name = compute_pod.name

env = get_environment(ctx)
image = get_container_image_name(algo.container_image_tag)
image = get_container_image_name(container_image_tag)
exec_command = get_exec_command(ctx)

k8s_client = _get_k8s_client()
Expand Down
24 changes: 14 additions & 10 deletions backend/substrapp/compute_tasks/image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import structlog
from django.conf import settings

import orchestrator
from substrapp.compute_tasks import errors as compute_task_errors
from substrapp.compute_tasks.algo import Algo
from substrapp.compute_tasks import utils
from substrapp.compute_tasks.compute_pod import Label
from substrapp.compute_tasks.datastore import Datastore
from substrapp.compute_tasks.volumes import get_docker_cache_pvc_name
from substrapp.compute_tasks.volumes import get_worker_subtuple_pvc_name
from substrapp.docker_registry import USER_IMAGE_REPOSITORY
Expand Down Expand Up @@ -39,18 +41,20 @@
HOSTNAME = settings.HOSTNAME


def build_image_if_missing(algo: Algo) -> None:
def build_image_if_missing(datastore: Datastore, algo: orchestrator.Algo) -> None:
"""
Build the container image and the ImageEntryPoint entry if they don't exist already
"""
with lock_resource("image-build", algo.container_image_tag, ttl=MAX_IMAGE_BUILD_TIME, timeout=MAX_IMAGE_BUILD_TIME):
if container_image_exists(algo.container_image_tag):
logger.info("Reusing existing image", image=algo.container_image_tag)
container_image_tag = utils.container_image_tag_from_algo(algo)
with lock_resource("image-build", container_image_tag, ttl=MAX_IMAGE_BUILD_TIME, timeout=MAX_IMAGE_BUILD_TIME):
if container_image_exists(container_image_tag):
logger.info("Reusing existing image", image=container_image_tag)
else:
_build_asset_image(algo)
asset_content = datastore.get_algo(algo)
_build_asset_image(asset_content, algo)


def _build_asset_image(algo: Algo) -> None:
def _build_asset_image(asset: bytes, algo: orchestrator.Algo) -> None:
"""
Build an asset's container image. Perform multiple steps:
1. Download the asset (algo or metric) using the provided asset storage_address/owner. Verify its checksum and
Expand All @@ -64,16 +68,16 @@ def _build_asset_image(algo: Algo) -> None:

with TemporaryDirectory(dir=SUBTUPLE_TMP_DIR) as tmp_dir:
# Download source
uncompress_content(algo.archive, tmp_dir)
uncompress_content(asset, tmp_dir)

# Extract ENTRYPOINT from Dockerfile
entrypoint = _get_entrypoint_from_dockerfile(tmp_dir)

# Build image
_build_container_image(tmp_dir, algo.container_image_tag)
_build_container_image(tmp_dir, utils.container_image_tag_from_algo(algo))

# Save entrypoint to DB if the image build was successful
ImageEntrypoint.objects.get_or_create(algo_checksum=algo.checksum, entrypoint_json=entrypoint)
ImageEntrypoint.objects.get_or_create(algo_checksum=algo.algorithm.checksum, entrypoint_json=entrypoint)


def _get_entrypoint_from_dockerfile(dockerfile_dir: str) -> list[str]:
Expand Down
13 changes: 13 additions & 0 deletions backend/substrapp/compute_tasks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import orchestrator


def container_image_tag_from_algo(algo: orchestrator.Algo) -> str:
"""builds the container image tag from the algo checksum
Args:
algo (orchestrator.Algo): an algo retrieved from the orchestrator
Returns:
str: the container image tag
"""
return f"algo-{algo.algorithm.checksum[:16]}"
20 changes: 15 additions & 5 deletions backend/substrapp/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# flake8: noqa
from .tasks_compute_plan import *
from .tasks_compute_task import *
from .tasks_docker_registry import *
from .tasks_remove_intermediary_models import *
from substrapp.tasks.tasks_compute_plan import delete_cp_pod_and_dirs_and_optionally_images
from substrapp.tasks.tasks_compute_task import compute_task
from substrapp.tasks.tasks_docker_registry import clean_old_images_task
from substrapp.tasks.tasks_docker_registry import docker_registry_garbage_collector_task
from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_models_from_buffer
from substrapp.tasks.tasks_remove_intermediary_models import remove_intermediary_models_from_db

__all__ = [
"delete_cp_pod_and_dirs_and_optionally_images",
"compute_task",
"clean_old_images_task",
"docker_registry_garbage_collector_task",
"remove_intermediary_models_from_db",
"remove_intermediary_models_from_buffer",
]
13 changes: 7 additions & 6 deletions backend/substrapp/tasks/tasks_compute_plan.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import typing

import structlog
from django.conf import settings

import orchestrator
from backend.celery import app
from substrapp.compute_tasks.algo import Algo
from substrapp.compute_tasks import utils
from substrapp.compute_tasks.compute_pod import delete_compute_plan_pods
from substrapp.compute_tasks.directories import Directories
from substrapp.compute_tasks.directories import teardown_compute_plan_dir
Expand Down Expand Up @@ -57,7 +59,7 @@ def _teardown_compute_plan_ressources(orc_client: orchestrator.Client, compute_p
return
_teardown_pods_and_dirs(compute_plan_key)

_delete_compute_plan_algos_images(orc_client, compute_plan_key)
_delete_compute_plan_algos_images(orc_client.query_algos(compute_plan_key))


def _teardown_pods_and_dirs(compute_plan_key: str) -> None:
Expand All @@ -66,7 +68,6 @@ def _teardown_pods_and_dirs(compute_plan_key: str) -> None:
teardown_compute_plan_dir(Directories(compute_plan_key))


def _delete_compute_plan_algos_images(orc_client: orchestrator.Client, compute_plan_key: str) -> None:
for orc_algo in orc_client.query_algos(compute_plan_key):
algo = Algo("", orc_algo)
delete_container_image_safe(algo.container_image_tag)
def _delete_compute_plan_algos_images(algos: typing.Iterable[orchestrator.Algo]) -> None:
for algo in algos:
delete_container_image_safe(utils.container_image_tag_from_algo(algo))
Loading

0 comments on commit 1e81bdf

Please sign in to comment.