Skip to content

Commit

Permalink
Feature: Control of balance for instances (#462)
Browse files Browse the repository at this point in the history
Added functionality to check the current balance and compute costs for instances.
  • Loading branch information
1yam authored Aug 22, 2023
1 parent 1181c3b commit 4c50268
Show file tree
Hide file tree
Showing 9 changed files with 482 additions and 14 deletions.
174 changes: 174 additions & 0 deletions deployment/migrations/versions/0018_7bcb8e5fe186_fix_vm_cost_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""fix VM cost view
Revision ID: 7bcb8e5fe186
Revises: f9fa39b6bdef
Create Date: 2023-08-04 15:14:39.082370
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '7bcb8e5fe186'
down_revision = 'f9fa39b6bdef'
branch_labels = None
depends_on = None

def upgrade() -> None:
op.execute(
"""
create or replace view vm_volumes_files_view as
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'code_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_code_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'data_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_data_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'runtime'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_runtimes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.vm_hash,
volume.ref,
volume.use_latest,
'machine_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM vm_machine_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
"""
)
op.execute(
"""
create or replace view costs_view as
SELECT COALESCE(vm_prices.owner, storage.owner) AS address,
vm_prices.total_vm_cost,
sc.total_storage_cost,
tc.total_cost
FROM (SELECT vm_costs_view.owner,
sum(vm_costs_view.total_price) AS total_vm_cost
FROM vm_costs_view
GROUP BY vm_costs_view.owner) vm_prices
FULL JOIN (SELECT file_pins.owner,
sum(f.size) AS storage_size
FROM file_pins
JOIN files f ON file_pins.file_hash::text = f.hash::text
WHERE file_pins.owner IS NOT NULL
GROUP BY file_pins.owner) storage ON vm_prices.owner::text = storage.owner::text,
LATERAL ( SELECT 3::numeric * storage.storage_size / 1000000::numeric AS total_storage_cost) sc,
LATERAL ( SELECT COALESCE(vm_prices.total_vm_cost, 0::double precision) +
COALESCE(sc.total_storage_cost, 0::numeric)::double precision AS total_cost) tc
"""
)
op.execute(
"""
create or replace view vm_costs_view as
SELECT vm_versions.vm_hash,
vm_versions.owner,
vms.resources_vcpus,
vms.resources_memory,
file_volumes_size.file_volumes_size,
other_volumes_size.other_volumes_size,
used_disk.required_disk_space,
cu.compute_units_required,
bcp.base_compute_unit_price,
m.compute_unit_price_multiplier,
cpm.compute_unit_price,
free_disk.included_disk_space,
additional_disk.additional_disk_space,
adp.disk_price,
tp.total_price
FROM vm_versions
JOIN vms ON vm_versions.current_version::text = vms.item_hash::text
JOIN (SELECT volume.vm_hash,
sum(files.size) AS file_volumes_size
FROM vm_volumes_files_view volume
LEFT JOIN files ON volume.volume_to_use::text = files.hash::text
GROUP BY volume.vm_hash) file_volumes_size
ON vm_versions.current_version::text = file_volumes_size.vm_hash::text
JOIN (SELECT instance_rootfs.instance_hash,
instance_rootfs.size_mib::bigint * 1024 * 1024 AS rootfs_size
FROM instance_rootfs) rootfs_size ON vm_versions.vm_hash::text = rootfs_size.instance_hash::text
JOIN (SELECT vm_machine_volumes.vm_hash,
sum(vm_machine_volumes.size_mib) * 1024 * 1024 AS other_volumes_size
FROM vm_machine_volumes
GROUP BY vm_machine_volumes.vm_hash) other_volumes_size
ON vm_versions.current_version::text = other_volumes_size.vm_hash::text,
LATERAL ( SELECT file_volumes_size.file_volumes_size +
other_volumes_size.other_volumes_size::numeric AS required_disk_space) used_disk,
LATERAL ( SELECT ceil(GREATEST(ceil((vms.resources_vcpus / 1)::double precision),
(vms.resources_memory / 2000)::double precision)) AS compute_units_required) cu,
LATERAL ( SELECT CASE
WHEN COALESCE(vms.persistent, true)
THEN '21474836480'::bigint::double precision * cu.compute_units_required
ELSE 2147483648::double precision * cu.compute_units_required
END AS included_disk_space) free_disk,
LATERAL ( SELECT GREATEST((file_volumes_size.file_volumes_size + rootfs_size.rootfs_size::numeric +
other_volumes_size.other_volumes_size::numeric)::double precision -
free_disk.included_disk_space,
0::double precision) AS additional_disk_space) additional_disk,
LATERAL ( SELECT CASE
WHEN COALESCE(vms.persistent, true) THEN 2000
ELSE 200
END AS base_compute_unit_price) bcp,
LATERAL ( SELECT 1 + vms.environment_internet::integer AS compute_unit_price_multiplier) m,
LATERAL ( SELECT cu.compute_units_required * m.compute_unit_price_multiplier::double precision *
bcp.base_compute_unit_price::double precision *
m.compute_unit_price_multiplier::double precision AS compute_unit_price) cpm,
LATERAL ( SELECT additional_disk.additional_disk_space * 20::double precision /
(1024 * 1024)::double precision AS disk_price) adp,
LATERAL ( SELECT cpm.compute_unit_price + adp.disk_price AS total_price) tp
"""
)
op.execute(
"""
INSERT INTO error_codes(code, description) VALUES
(5, 'Insufficient balance')
"""
)


def downgrade() -> None:
op.execute("drop view costs_view")
op.execute("drop view vm_costs_view")
op.execute("drop view vm_volumes_files_view")
15 changes: 13 additions & 2 deletions src/aleph/db/accessors/vms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as dt
from typing import Optional, Iterable

from sqlalchemy import delete, func, select
from decimal import Decimal
from sqlalchemy import delete, func, select, text
from sqlalchemy.dialects.postgresql import insert

from aleph.db.models.vms import (
Expand Down Expand Up @@ -113,3 +113,14 @@ def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
)
session.execute(delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash))
session.execute(upsert_stmt)


def get_total_cost_for_address(session: DbSession, address: str) -> Decimal:
select_stmt = (
select(func.sum(text("total_cost")))
.select_from(text("public.costs_view"))
.where(text("address = :address"))
).params(address=address)

total_cost = session.execute(select_stmt).scalar()
return Decimal(total_cost) if total_cost is not None else Decimal(0)
11 changes: 11 additions & 0 deletions src/aleph/handlers/content/content_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ async def process(self, session: DbSession, messages: List[MessageDb]) -> None:
"""
pass

async def check_balance(self, session: DbSession, message: MessageDb) -> None:
"""
Checks whether the user has enough Aleph tokens to process the message.
Raises InsufficientBalanceException if the balance of the user is too low.
:param session: DB session.
:param message: Message being processed.
"""
pass

async def check_dependencies(self, session: DbSession, message: MessageDb) -> None:
"""
Check dependencies of a message.
Expand Down
45 changes: 44 additions & 1 deletion src/aleph/handlers/content/vm.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import logging
import math
from typing import List, Set, overload, Protocol, Optional

from aleph_message.models import ProgramContent, ExecutableContent, InstanceContent
from aleph_message.models import (
ProgramContent,
ExecutableContent,
InstanceContent,
MessageType,
)
from aleph_message.models.execution.volume import (
AbstractVolume,
ImmutableVolume,
EphemeralVolume,
PersistentVolume,
ParentVolume,
)
from decimal import Decimal

from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.files import (
find_file_tags,
find_file_pins,
Expand All @@ -23,6 +31,7 @@
delete_vm_updates,
refresh_vm_version,
is_vm_amend_allowed,
get_total_cost_for_address,
)
from aleph.db.models import (
MessageDb,
Expand All @@ -39,8 +48,10 @@
RootfsVolumeDb,
VmBaseDb,
StoredFileDb,
FilePinDb,
)
from aleph.handlers.content.content_handler import ContentHandler
from aleph.services.cost import compute_cost
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSession
from aleph.types.files import FileTag
Expand All @@ -52,6 +63,7 @@
VmUpdateNotAllowed,
VmCannotUpdateUpdate,
VmVolumeTooSmall,
InsufficientBalanceException,
)
from aleph.types.vms import VmVersion

Expand All @@ -67,6 +79,12 @@ def _get_vm_content(message: MessageDb) -> ExecutableContent:
return content


from aleph_message.models.execution.program import (
MachineType,
ProgramContent,
)


@overload
def _map_content_to_db_model(item_hash: str, content: InstanceContent) -> VmInstanceDb:
...
Expand Down Expand Up @@ -289,6 +307,9 @@ class HasParent(Protocol):
)





class VmMessageHandler(ContentHandler):
"""
Handles both PROGRAM and INSTANCE messages.
Expand All @@ -298,6 +319,27 @@ class VmMessageHandler(ContentHandler):
"""

async def check_balance(self, session: DbSession, message: MessageDb) -> None:
if message.type != MessageType.instance:
return
content = _get_vm_content(message)
if isinstance(content, ProgramContent):
return

required_tokens = compute_cost(session=session, content=content)
current_balance = (
get_total_balance(address=content.address, session=session) or 0
)
current_instance_costs = get_total_cost_for_address(
session=session, address=content.address
)

if current_balance < current_instance_costs + required_tokens:
raise InsufficientBalanceException(
balance=Decimal(current_balance),
required_balance=current_instance_costs + required_tokens,
)

async def check_dependencies(self, session: DbSession, message: MessageDb) -> None:
content = _get_vm_content(message)

Expand Down Expand Up @@ -358,3 +400,4 @@ async def forget_message(self, session: DbSession, message: MessageDb) -> Set[st
refresh_vm_version(session=session, vm_hash=message.item_hash)

return update_hashes

2 changes: 1 addition & 1 deletion src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ async def add_pending_message(
tx_hash: Optional[str] = None,
check_message: bool = True,
) -> Optional[PendingMessageDb]:

# TODO: this implementation is just messy, improve it.
with self.session_factory() as session:
try:
Expand Down Expand Up @@ -318,6 +317,7 @@ async def process(
content_handler = self.get_content_handler(message.type)
await content_handler.check_dependencies(session=session, message=message)
await self.check_permissions(session=session, message=message)
await content_handler.check_balance(session=session, message=message)
await self.insert_message(
session=session, pending_message=pending_message, message=message
)
Expand Down
Loading

0 comments on commit 4c50268

Please sign in to comment.