Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/aleph/sdk/client/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,61 @@ async def watch_messages(
elif msg.type == aiohttp.WSMsgType.ERROR:
break

async def get_store_estimated_price(
self,
storage_size_mib: int,
) -> PriceResponse:
"""
Get the estimated price for a store operation.

:param storage_size_mib: size in mib you want to store
:return: Price response with cost information
"""
content = {
"address": "0xWeDoNotNeedARealAddress",
"time": time.time(),
"item_type": ItemType.storage,
"estimated_size_mib": storage_size_mib,
"item_hash": compute_sha256("dummy_value"),
}

item_content: str = json.dumps(
content,
separators=(",", ":"),
default=extended_json_encoder,
)

message_dict = dict(
sender=content["address"],
chain=Chain.ETH,
type=MessageType.store,
content=content,
item_content=item_content,
time=time.time(),
channel=settings.DEFAULT_CHANNEL,
item_type=ItemType.inline,
item_hash=compute_sha256(item_content),
signature="0x" + "0" * 130, # Add a dummy signature to pass validation
)

message = parse_message(message_dict)

async with self.http_session.post(
"/api/v0/price/estimate", json=dict(message=message)
) as resp:
try:
resp.raise_for_status()
response_json = await resp.json()
cost = response_json.get("cost", None)

return PriceResponse(
cost=cost,
required_tokens=response_json["required_tokens"],
payment_type=response_json["payment_type"],
)
except aiohttp.ClientResponseError as e:
raise e

async def get_estimated_price(
self,
content: ExecutableContent,
Expand Down
138 changes: 114 additions & 24 deletions src/aleph/sdk/client/services/crn.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import datetime
from typing import TYPE_CHECKING, Dict, List, Optional, Union

import aiohttp
from aiohttp.client_exceptions import ClientResponseError
from aleph_message.models import ItemHash
from pydantic import BaseModel
from pydantic import BaseModel, NonNegativeInt, PositiveInt

from aleph.sdk.conf import settings
from aleph.sdk.exceptions import MethodNotAvailableOnCRN, VmNotFoundOnHost
Expand All @@ -13,22 +14,81 @@
CrnV1List,
CrnV2List,
DictLikeModel,
VmResources,
)
from aleph.sdk.utils import extract_valid_eth_address, sanitize_url

if TYPE_CHECKING:
from aleph.sdk.client.http import AlephHttpClient


class CpuLoad(BaseModel):
load1: float
load5: float
load15: float


class CoreFrequencies(BaseModel):
min: float
max: float


class CpuInfo(BaseModel):
count: PositiveInt
load_average: CpuLoad
core_frequencies: CoreFrequencies


class CpuProperties(BaseModel):
architecture: str
vendor: str
features: List[str] = []


class MemoryInfo(BaseModel):
total_kB: PositiveInt
available_kB: NonNegativeInt


class DiskInfo(BaseModel):
total_kB: PositiveInt
available_kB: NonNegativeInt


class UsagePeriod(BaseModel):
start_timestamp: datetime
duration_seconds: int


class Properties(BaseModel):
cpu: CpuProperties


class GPU(BaseModel):
vendor: str
model: str
device_name: str
device_class: str
pci_host: str
device_id: str
compatible: bool


class GpuUsages(BaseModel):
devices: List[GPU] = []
available_devices: List[GPU] = []


class SystemUsage(BaseModel):
cpu: CpuInfo
mem: MemoryInfo
disk: DiskInfo
period: UsagePeriod
properties: Properties
gpu: GpuUsages
active: bool


class NetworkGPUS(BaseModel):
total_gpu_count: int
available_gpu_count: int
Expand All @@ -47,6 +107,7 @@ class CRN(DictLikeModel):
gpu_support: Optional[bool] = False
confidential_support: Optional[bool] = False
qemu_support: Optional[bool] = False
system_usage: Optional[SystemUsage] = None

version: Optional[str] = "0.0.0"
payment_receiver_address: Optional[str] # Can be None if not configured
Expand All @@ -71,20 +132,20 @@ def find_gpu_on_network(self):
compatible_gpu: Dict[str, List[GPU]] = {}
available_compatible_gpu: Dict[str, List[GPU]] = {}

for crn_ in self.crns:
if not crn_.gpu_support:
for crn in self.crns:
if not crn.gpu_support:
continue

# Extracts used GPU
for gpu in crn_.get("compatible_gpus", []):
compatible_gpu[crn_.address] = []
compatible_gpu[crn_.address].append(GPU.model_validate(gpu))
for gpu in crn.get("compatible_gpus", []):
compatible_gpu[crn.address] = []
compatible_gpu[crn.address].append(GPU.model_validate(gpu))
gpu_count += 1

# Extracts available GPU
for gpu in crn_.get("compatible_available_gpus", []):
available_compatible_gpu[crn_.address] = []
available_compatible_gpu[crn_.address].append(GPU.model_validate(gpu))
for gpu in crn.get("compatible_available_gpus", []):
available_compatible_gpu[crn.address] = []
available_compatible_gpu[crn.address].append(GPU.model_validate(gpu))
gpu_count += 1
available_gpu_count += 1

Expand All @@ -102,6 +163,7 @@ def filter_crn(
stream_address: bool = False,
confidential: bool = False,
gpu: bool = False,
vm_resources: Optional[VmResources] = None,
) -> list[CRN]:
"""Filter compute resource node list, unfiltered by default.
Args:
Expand All @@ -110,51 +172,79 @@ def filter_crn(
stream_address (bool): Filter invalid payment receiver address.
confidential (bool): Filter by confidential computing support.
gpu (bool): Filter by GPU support.
vm_resources (VmResources): Filter by VM need, vcpus, memory, disk.
Returns:
list[CRN]: List of compute resource nodes. (if no filter applied, return all)
"""

filtered_crn: list[CRN] = []
for crn_ in self.crns:
for crn in self.crns:
# Check crn version
if crn_version and (crn_.version or "0.0.0") < crn_version:
if crn_version and (crn.version or "0.0.0") < crn_version:
continue

# Filter with ipv6 check
if ipv6:
ipv6_check = crn_.get("ipv6_check")
if not ipv6_check or not all(ipv6_check.values()):
ipv6_check = crn.get("ipv6_check")

# The diagnostic VM has an issue where it can fail even when it is working correctly.
# To avoid ending up with only a few working CRNs, we only ensure that the
# `ipv6_check` field exists, which means the VM ran, even if the test failed.

if not ipv6_check: # or not all(ipv6_check.values())
continue

if stream_address and not extract_valid_eth_address(
crn_.payment_receiver_address or ""
crn.payment_receiver_address or ""
):
continue

# Confidential Filter
if confidential and not crn_.confidential_support:
if confidential and not crn.confidential_support:
continue

# Filter with GPU / Available GPU
available_gpu = crn_.get("compatible_available_gpus")
if gpu and (not crn_.gpu_support or not available_gpu):
available_gpu = crn.get("compatible_available_gpus")
if gpu and (not crn.gpu_support or not available_gpu):
continue

filtered_crn.append(crn_)
# Filter VM resources
if vm_resources:
sys = crn.system_usage
if not sys:
continue

# Check CPU count
if sys.cpu.count < vm_resources.vcpus:
continue

# Convert MiB to kB (1 MiB = 1024 kB) for proper comparison
memory_kb_required = vm_resources.memory * 1024
disk_kb_required = vm_resources.disk_mib * 1024

# Check free memory
if sys.mem.available_kB < memory_kb_required:
continue

# Check free disk
if sys.disk.available_kB < disk_kb_required:
continue

filtered_crn.append(crn)
return filtered_crn

# Find CRN by address
def find_crn_by_address(self, address: str) -> Optional[CRN]:
for crn_ in self.crns:
if crn_.address == sanitize_url(address):
return crn_
for crn in self.crns:
if crn.address == sanitize_url(address):
return crn
return None

# Find CRN by hash
def find_crn_by_hash(self, crn_hash: str) -> Optional[CRN]:
for crn_ in self.crns:
if crn_.hash == crn_hash:
return crn_
for crn in self.crns:
if crn.hash == crn_hash:
return crn
return None

def find_crn(
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/sdk/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,9 @@ class Voucher(BaseModel):
image: str
icon: str
attributes: list[VoucherAttribute]


class VmResources(BaseModel):
vcpus: int
memory: int
disk_mib: int
Loading