Skip to content

Check community payment flow #751

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/aleph/vm/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,8 @@ class Settings(BaseSettings):

# Settings to get from the network aggregates
SETTINGS_AGGREGATE_ADDRESS: str = "0xFba561a84A537fCaa567bb7A2257e7142701ae2A"
COMMUNITY_WALLET_ADDRESS: str | None = None
COMPATIBLE_GPUS: List[dict[str, str]] = []

# Tests on programs

FAKE_DATA_PROGRAM: Path | None = None
BENCHMARK_FAKE_DATA_PROGRAM = Path(abspath(join(__file__, "../../../../examples/example_fastapi")))

Expand Down
60 changes: 49 additions & 11 deletions src/aleph/vm/orchestrator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import math
import time
from collections.abc import AsyncIterable
from decimal import Decimal
from typing import TypeVar

import aiohttp
Expand All @@ -19,6 +20,10 @@
from yarl import URL

from aleph.vm.conf import settings
from aleph.vm.orchestrator.utils import (
get_community_wallet_address,
is_after_community_wallet_start,
)
from aleph.vm.pool import VmPool
from aleph.vm.utils import create_task_log_exceptions

Expand All @@ -35,6 +40,7 @@
logger = logging.getLogger(__name__)

Value = TypeVar("Value")
COMMUNITY_STREAM_RATIO = Decimal(0.2)


async def retry_generator(generator: AsyncIterable[Value], max_seconds: int = 8) -> AsyncIterable[Value]:
Expand Down Expand Up @@ -154,6 +160,7 @@
try:
logger.debug("Monitoring balances task running")
await check_payment(pool)
logger.debug("Monitoring balances task ended")

Check warning on line 163 in src/aleph/vm/orchestrator/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/tasks.py#L163

Added line #L163 was not covered by tests
except Exception as e:
# Catch all exceptions as to never stop the task.
logger.warning(f"check_payment failed {e}", exc_info=True)
Expand Down Expand Up @@ -191,31 +198,62 @@
logger.debug(f"Stopping {last_execution} due to insufficient balance")
await pool.stop_vm(last_execution.vm_hash)
required_balance = await compute_required_balance(executions)
community_wallet = await get_community_wallet_address()
if not community_wallet:
logger.error("Monitor payment ERROR: No community wallet set. Cannot check community payment")

Check warning on line 203 in src/aleph/vm/orchestrator/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/tasks.py#L203

Added line #L203 was not covered by tests

# Check if the balance held in the wallet is sufficient stream tier resources
for sender, chains in pool.get_executions_by_sender(payment_type=PaymentType.superfluid).items():
for chain, executions in chains.items():
try:
stream = await get_stream(sender=sender, receiver=settings.PAYMENT_RECEIVER_ADDRESS, chain=chain)

logger.debug(
f"Get stream flow from Sender {sender} to Receiver {settings.PAYMENT_RECEIVER_ADDRESS} of {stream}"
f"Stream flow from {sender} to {settings.PAYMENT_RECEIVER_ADDRESS} = {stream} {chain.value}"
)
except ValueError as error:
logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}")
continue

Check warning on line 216 in src/aleph/vm/orchestrator/tasks.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/tasks.py#L214-L216

Added lines #L214 - L216 were not covered by tests
try:
community_stream = await get_stream(sender=sender, receiver=community_wallet, chain=chain)
logger.debug(f"Stream flow from {sender} to {community_wallet} (community) : {stream} {chain}")

except ValueError as error:
logger.error(f"Error found getting stream for chain {chain} and sender {sender}: {error}")
continue

required_stream = await compute_required_flow(executions)
logger.debug(f"Required stream for Sender {sender} executions: {required_stream}")
# Stop executions until the required stream is reached
while (stream + settings.PAYMENT_BUFFER) < required_stream:
try:
last_execution = executions.pop(-1)
except IndexError: # Empty list
logger.debug("No execution can be maintained due to insufficient stream")
while executions:
executions_with_community = [
execution
for execution in executions
if await is_after_community_wallet_start(execution.times.started_at)
]

required_stream = await compute_required_flow(executions_with_community)
executions_without_community = [
execution
for execution in executions
if not await is_after_community_wallet_start(execution.times.started_at)
]
logger.info("flow community %s", executions_with_community)
logger.info("flow without community %s", executions_without_community)
required_stream_without_community = await compute_required_flow(executions_without_community)

required_crn_stream = required_stream * (1 - COMMUNITY_STREAM_RATIO) + required_stream_without_community
required_community_stream = required_stream * COMMUNITY_STREAM_RATIO
logger.debug(
f"Stream for senders {sender} {len(executions)} executions. CRN : {stream} / {required_crn_stream}."
f"Community: {community_stream} / {required_community_stream}"
)
# Can pay all executions
if (stream + settings.PAYMENT_BUFFER) > required_crn_stream and (
community_stream + settings.PAYMENT_BUFFER
) > required_community_stream:
break
logger.debug(f"Stopping {last_execution} due to insufficient stream")
# Stop executions until the required stream is reached
last_execution = executions.pop(-1)
logger.info(f"Stopping {last_execution} of {sender} due to insufficient stream")
await pool.stop_vm(last_execution.vm_hash)
required_stream = await compute_required_flow(executions)


async def start_payment_monitoring_task(app: web.Application):
Expand Down
81 changes: 75 additions & 6 deletions src/aleph/vm/orchestrator/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
from typing import Any
from datetime import datetime, timedelta, timezone
from logging import getLogger
from typing import Any, TypedDict

import aiohttp

from aleph.vm.conf import settings

logger = getLogger(__name__)

async def fetch_aggregate_settings() -> dict[str, Any] | None:

class AggregateSettingsDict(TypedDict):
compatible_gpus: list[Any]
community_wallet_address: str
community_wallet_timestamp: int


LAST_AGGREGATE_SETTINGS: AggregateSettingsDict | None = None
LAST_AGGREGATE_SETTINGS_FETCHED_AT: datetime | None = None


async def fetch_aggregate_settings() -> AggregateSettingsDict | None:
"""
Get the settings Aggregate dict from the PyAleph API Aggregate.

Expand All @@ -17,6 +31,7 @@
"""
async with aiohttp.ClientSession() as session:
url = f"{settings.API_SERVER}/api/v0/aggregates/{settings.SETTINGS_AGGREGATE_ADDRESS}.json?keys=settings"
logger.info(f"Fetching settings aggregate from {url}")
resp = await session.get(url)

# Raise an error if the request failed
Expand All @@ -27,7 +42,61 @@


async def update_aggregate_settings():
aggregate_settings = await fetch_aggregate_settings()
if aggregate_settings:
settings.COMPATIBLE_GPUS = aggregate_settings["compatible_gpus"]
settings.COMMUNITY_WALLET_ADDRESS = aggregate_settings["community_wallet_address"]
global LAST_AGGREGATE_SETTINGS # noqa: PLW0603
global LAST_AGGREGATE_SETTINGS_FETCHED_AT # noqa: PLW0603

LAST_AGGREGATE_SETTINGS = await fetch_aggregate_settings()
if (
not LAST_AGGREGATE_SETTINGS
or LAST_AGGREGATE_SETTINGS_FETCHED_AT
and datetime.now(tz=timezone.utc) - LAST_AGGREGATE_SETTINGS_FETCHED_AT > timedelta(minutes=1)
):
try:
aggregate = await fetch_aggregate_settings()
LAST_AGGREGATE_SETTINGS = aggregate
LAST_AGGREGATE_SETTINGS_FETCHED_AT = datetime.now(tz=timezone.utc)

Check warning on line 57 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L54-L57

Added lines #L54 - L57 were not covered by tests

except Exception:
logger.exception("Failed to fetch aggregate settings")

Check warning on line 60 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L59-L60

Added lines #L59 - L60 were not covered by tests


async def get_aggregate_settings() -> AggregateSettingsDict | None:
"""The settings aggregate is a special aggregate used to share some common settings for VM setup

Ensure the cached version is up to date and return it"""
await update_aggregate_settings()

if not LAST_AGGREGATE_SETTINGS:
logger.error("No setting aggregate")

Check warning on line 70 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L70

Added line #L70 was not covered by tests
return LAST_AGGREGATE_SETTINGS


async def get_community_wallet_address() -> str | None:
setting_aggr = await get_aggregate_settings()
return setting_aggr and setting_aggr.get("community_wallet_address")

Check warning on line 76 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L75-L76

Added lines #L75 - L76 were not covered by tests


async def get_community_wallet_start() -> datetime:
"""Community wallet start time.

After this timestamp. New PAYG must include a payment to the community wallet"""
setting_aggr = await get_aggregate_settings()
if setting_aggr is None or "community_wallet_timestamp" not in setting_aggr:
return datetime.now(tz=timezone.utc)

Check warning on line 85 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L85

Added line #L85 was not covered by tests
timestamp = setting_aggr["community_wallet_timestamp"]
start_datetime = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return start_datetime


async def is_after_community_wallet_start(dt: datetime | None = None) -> bool:
"""Community wallet start time"""
if not dt:
dt = datetime.now(tz=timezone.utc)
start_dt = await get_community_wallet_start()
return dt > start_dt


def get_compatible_gpus() -> list[Any]:
if not LAST_AGGREGATE_SETTINGS:
return []

Check warning on line 101 in src/aleph/vm/orchestrator/utils.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/utils.py#L101

Added line #L101 was not covered by tests
return LAST_AGGREGATE_SETTINGS["compatible_gpus"]
47 changes: 40 additions & 7 deletions src/aleph/vm/orchestrator/views/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import binascii
import contextlib
import logging
from decimal import Decimal
from hashlib import sha256
Expand All @@ -8,7 +7,6 @@
from pathlib import Path
from secrets import compare_digest
from string import Template
from typing import Optional

import aiodns
import aiohttp
Expand All @@ -26,7 +24,7 @@
from aleph.vm.controllers.firecracker.program import FileTooLargeError
from aleph.vm.hypervisors.firecracker.microvm import MicroVMFailedInitError
from aleph.vm.orchestrator import payment, status
from aleph.vm.orchestrator.chain import STREAM_CHAINS, ChainInfo
from aleph.vm.orchestrator.chain import STREAM_CHAINS
from aleph.vm.orchestrator.custom_logs import set_vm_for_logging
from aleph.vm.orchestrator.messages import try_get_message
from aleph.vm.orchestrator.metrics import get_execution_records
Expand All @@ -39,6 +37,12 @@
from aleph.vm.orchestrator.pubsub import PubSub
from aleph.vm.orchestrator.resources import Allocation, VMNotification
from aleph.vm.orchestrator.run import run_code_on_request, start_persistent_vm
from aleph.vm.orchestrator.tasks import COMMUNITY_STREAM_RATIO
from aleph.vm.orchestrator.utils import (
get_community_wallet_address,
is_after_community_wallet_start,
update_aggregate_settings,
)
from aleph.vm.orchestrator.views.host_status import (
check_dns_ipv4,
check_dns_ipv6,
Expand Down Expand Up @@ -468,6 +472,7 @@
@cors_allow_all
async def notify_allocation(request: web.Request):
"""Notify instance allocation, only used for Pay as you Go feature"""
await update_aggregate_settings()

Check warning on line 475 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L475

Added line #L475 was not covered by tests
try:
data = await request.json()
vm_notification = VMNotification.parse_obj(data)
Expand Down Expand Up @@ -526,16 +531,44 @@
raise web.HTTPPaymentRequired(reason="Empty payment stream for this instance")

required_flow: Decimal = await fetch_execution_flow_price(item_hash)

if active_flow < required_flow:
community_wallet = await get_community_wallet_address()

Check warning on line 534 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L534

Added line #L534 was not covered by tests
required_crn_stream: Decimal
required_community_stream: Decimal
if await is_after_community_wallet_start() and community_wallet:
required_crn_stream = required_flow * (1 - COMMUNITY_STREAM_RATIO)
required_community_stream = required_flow * COMMUNITY_STREAM_RATIO

Check warning on line 539 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L538-L539

Added lines #L538 - L539 were not covered by tests
else: # No community wallet payment
required_crn_stream = required_flow
required_community_stream = Decimal(0)

Check warning on line 542 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L541-L542

Added lines #L541 - L542 were not covered by tests

if active_flow < (required_crn_stream - settings.PAYMENT_BUFFER):
active_flow_per_month = active_flow * 60 * 60 * 24 * (Decimal("30.41666666666923904761904784"))
required_flow_per_month = required_flow * 60 * 60 * 24 * Decimal("30.41666666666923904761904784")
required_flow_per_month = required_crn_stream * 60 * 60 * 24 * Decimal("30.41666666666923904761904784")

Check warning on line 546 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L546

Added line #L546 was not covered by tests
return web.HTTPPaymentRequired(
reason="Insufficient payment stream",
text="Insufficient payment stream for this instance\n\n"
f"Required: {required_flow_per_month} / month (flow = {required_flow})\n"
f"Required: {required_flow_per_month} / month (flow = {required_crn_stream})\n"
f"Present: {active_flow_per_month} / month (flow = {active_flow})",
)

if community_wallet and required_community_stream:
community_flow: Decimal = await get_stream(

Check warning on line 555 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L555

Added line #L555 was not covered by tests
sender=message.sender,
receiver=community_wallet,
chain=message.content.payment.chain,
)
if community_flow < (required_community_stream - settings.PAYMENT_BUFFER):
active_flow_per_month = community_flow * 60 * 60 * 24 * (Decimal("30.41666666666923904761904784"))
required_flow_per_month = (

Check warning on line 562 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L561-L562

Added lines #L561 - L562 were not covered by tests
required_community_stream * 60 * 60 * 24 * Decimal("30.41666666666923904761904784")
)
return web.HTTPPaymentRequired(

Check warning on line 565 in src/aleph/vm/orchestrator/views/__init__.py

View check run for this annotation

Codecov / codecov/patch

src/aleph/vm/orchestrator/views/__init__.py#L565

Added line #L565 was not covered by tests
reason="Insufficient payment stream to community",
text="Insufficient payment stream for community \n\n"
f"Required: {required_flow_per_month} / month (flow = {required_community_stream})\n"
f"Present: {active_flow_per_month} / month (flow = {community_flow})\n"
f"Address: {community_wallet}",
)
else:
return web.HTTPBadRequest(reason="Invalid payment method")

Expand Down
5 changes: 3 additions & 2 deletions src/aleph/vm/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydantic import BaseModel, Extra, Field

from aleph.vm.conf import settings
from aleph.vm.orchestrator.utils import get_compatible_gpus


class HostGPU(BaseModel):
Expand Down Expand Up @@ -60,7 +61,7 @@ def is_gpu_device_class(device_class: str) -> bool:

def get_gpu_model(device_id: str) -> bool | None:
"""Returns a GPU model name if it's found from the compatible ones."""
model_gpu_set = {gpu["device_id"]: gpu["model"] for gpu in settings.COMPATIBLE_GPUS}
model_gpu_set = {gpu["device_id"]: gpu["model"] for gpu in get_compatible_gpus()}
try:
return model_gpu_set[device_id]
except KeyError:
Expand All @@ -69,7 +70,7 @@ def get_gpu_model(device_id: str) -> bool | None:

def is_gpu_compatible(device_id: str) -> bool:
"""Checks if a GPU is compatible based on vendor and model IDs."""
compatible_gpu_set = {gpu["device_id"] for gpu in settings.COMPATIBLE_GPUS}
compatible_gpu_set = {gpu["device_id"] for gpu in get_compatible_gpus()}
return device_id in compatible_gpu_set


Expand Down
Loading