Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add create_instance method #78

Merged
merged 4 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/aleph/sdk/client/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from aleph_message.status import MessageStatus

from ..query.filters import MessageFilter, PostFilter
from ..query.params import VmParams
from ..query.responses import PostsResponse
from ..types import GenericMessage, StorageEnum
from ..utils import Writable
Expand Down Expand Up @@ -315,6 +316,9 @@ async def create_program(
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
persistent: bool = False,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
subscriptions: Optional[List[Mapping]] = None,
Expand All @@ -335,13 +339,62 @@ async def create_program(
:param vcpus: Number of vCPUs to allocate (Default: 1)
:param timeout_seconds: Timeout in seconds (Default: 30.0)
:param persistent: Whether the program should be persistent or not (Default: False)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param internet: Whether the VM should have internet connectivity. (Default: True)
:param aleph_api: Whether the VM needs access to Aleph messages API (Default: True)
:param encoding: Encoding to use (Default: Encoding.zip)
:param volumes: Volumes to mount
:param subscriptions: Patterns of aleph.im messages to forward to the program's event receiver
:param metadata: Metadata to attach to the message
"""
pass

@abstractmethod
async def create_instance(
self,
rootfs: str,
rootfs_size: int,
rootfs_name: str,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
ssh_keys: Optional[List[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> Tuple[AlephMessage, MessageStatus]:
"""
Post a (create) PROGRAM message.

:param rootfs: Root filesystem to use
:param rootfs_size: Size of root filesystem
:param rootfs_name: Name of root filesystem
:param environment_variables: Environment variables to pass to the program
:param storage_engine: Storage engine to use (Default: "storage")
:param channel: Channel to use (Default: "TEST")
:param address: Address to use (Default: account.get_address())
:param sync: If true, waits for the message to be processed by the API server
:param memory: Memory in MB for the VM to be allocated (Default: 128)
:param vcpus: Number of vCPUs to allocate (Default: 1)
:param timeout_seconds: Timeout in seconds (Default: 30.0)
:param allow_amend: Whether the deployed VM image may be changed (Default: False)
:param internet: Whether the VM should have internet connectivity. (Default: True)
:param aleph_api: Whether the VM needs access to Aleph messages API (Default: True)
:param encoding: Encoding to use (Default: Encoding.zip)
:param volumes: Volumes to mount
:param ssh_keys: SSH keys to authorize access to the VM
:param metadata: Metadata to attach to the message
"""
pass

@abstractmethod
async def forget(
self,
Expand Down
93 changes: 86 additions & 7 deletions src/aleph/sdk/client/authenticated_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,22 @@
ProgramContent,
ProgramMessage,
StoreContent,
StoreMessage,
StoreMessage, InstanceMessage, InstanceContent,
)
from aleph_message.models.execution.base import Encoding
from aleph_message.models.execution.environment import (
FunctionEnvironment,
MachineResources,
)
from aleph_message.models.execution.instance import RootfsVolume
from aleph_message.models.execution.program import CodeContent, FunctionRuntime
from aleph_message.models.execution.volume import MachineVolume
from aleph_message.models.execution.volume import MachineVolume, ParentVolume
from aleph_message.status import MessageStatus
from pydantic.json import pydantic_encoder

from ..conf import settings
from ..exceptions import BroadcastError, InvalidMessageError
from ..query.params import VmParams
from ..types import Account, StorageEnum
from .abstract import AuthenticatedAlephClient
from .http import AlephHttpClient
Expand Down Expand Up @@ -401,6 +403,9 @@ async def create_program(
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
persistent: bool = False,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
subscriptions: Optional[List[Mapping]] = None,
Expand Down Expand Up @@ -434,7 +439,7 @@ async def create_program(
content = ProgramContent(
type="vm-function",
address=address,
allow_amend=False,
allow_amend=allow_amend,
code=CodeContent(
encoding=encoding,
entrypoint=entrypoint,
Expand All @@ -444,8 +449,8 @@ async def create_program(
on=triggers,
environment=FunctionEnvironment(
reproducible=False,
internet=True,
aleph_api=True,
internet=internet,
aleph_api=aleph_api,
),
variables=environment_variables,
resources=MachineResources(
Expand All @@ -460,7 +465,9 @@ async def create_program(
if runtime == settings.DEFAULT_RUNTIME_ID
else "",
),
volumes=volumes,
volumes=[
MachineVolume.parse_obj(volume) for volume in volumes
],
time=time.time(),
metadata=metadata,
)
Expand All @@ -470,7 +477,79 @@ async def create_program(

return await self.submit(
content=content.dict(exclude_none=True),
message_type=MessageType.program,
message_type=MessageType.instance,
channel=channel,
storage_engine=storage_engine,
sync=sync,
)

async def create_instance(
self,
rootfs: str,
rootfs_size: int,
rootfs_name: str,
environment_variables: Optional[Mapping[str, str]] = None,
storage_engine: StorageEnum = StorageEnum.storage,
channel: Optional[str] = None,
address: Optional[str] = None,
sync: bool = False,
memory: Optional[int] = None,
vcpus: Optional[int] = None,
timeout_seconds: Optional[float] = None,
allow_amend: bool = False,
internet: bool = True,
aleph_api: bool = True,
encoding: Encoding = Encoding.zip,
volumes: Optional[List[Mapping]] = None,
volume_persistence: str = "host",
ssh_keys: Optional[List[str]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> Tuple[InstanceMessage, MessageStatus]:
address = address or settings.ADDRESS_TO_USE or self.account.get_address()

volumes = volumes if volumes is not None else []
memory = memory or settings.DEFAULT_VM_MEMORY
vcpus = vcpus or settings.DEFAULT_VM_VCPUS
timeout_seconds = timeout_seconds or settings.DEFAULT_VM_TIMEOUT

content = InstanceContent(
address=address,
allow_amend=allow_amend,
environment=FunctionEnvironment(
reproducible=False,
internet=internet,
aleph_api=aleph_api,
),
variables=environment_variables,
resources=MachineResources(
vcpus=vcpus,
memory=memory,
seconds=timeout_seconds,
),
rootfs=RootfsVolume(
parent=ParentVolume(
ref=rootfs,
use_latest=True,
),
name=rootfs_name,
size_mib=rootfs_size,
persistence="host",
use_latest=True,
comment="Official Aleph Debian root filesystem"
if rootfs == settings.DEFAULT_RUNTIME_ID
else "",
),
volumes=[
MachineVolume.parse_obj(volume) for volume in volumes
],
time=time.time(),
authorized_keys=ssh_keys,
metadata=metadata,
)

return await self.submit(
content=content.dict(exclude_none=True),
message_type=MessageType.instance,
channel=channel,
storage_engine=storage_engine,
sync=sync,
Expand Down
17 changes: 16 additions & 1 deletion tests/unit/test_asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
ForgetMessage,
PostMessage,
ProgramMessage,
StoreMessage,
StoreMessage, InstanceMessage,
)
from aleph_message.status import MessageStatus

Expand Down Expand Up @@ -141,6 +141,21 @@ async def test_create_program(mock_session_with_post_success):
assert isinstance(program_message, ProgramMessage)


@pytest.mark.asyncio
async def test_create_instance(mock_session_with_post_success):
async with mock_session_with_post_success as session:
instance_message, message_status = await session.create_instance(
rootfs="cafecafecafecafecafecafecafecafecafecafecafecafecafecafecafecafe",
rootfs_size=1,
rootfs_name="rootfs",
channel="TEST",
metadata={"tags": ["test"]},
)

assert mock_session_with_post_success.http_session.post.called_once
assert isinstance(instance_message, InstanceMessage)


@pytest.mark.asyncio
async def test_forget(mock_session_with_post_success):
async with mock_session_with_post_success as session:
Expand Down
Loading