Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Control of balance for instances #462

Merged
merged 29 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
64b4341
Do not merge: fix cost view migration
odesenfans Jul 27, 2023
6d21807
Feature: Control of balance for Instances
1yam Aug 3, 2023
4a42b7a
Fix: price calculations in db views
1yam Aug 7, 2023
5dce8d1
Fix: return type Decimal
1yam Aug 7, 2023
0035df4
Refactor: rename get_extra_storage to get_volume_size
1yam Aug 7, 2023
5d19d62
Refactor: functions get_volume_size
1yam Aug 7, 2023
c58f02d
Fix: mypy
1yam Aug 7, 2023
00be932
Add new test with no balance
1yam Aug 7, 2023
7681e22
Fix: Add ERROR code 5 (Insufficient balances) in db migrations
1yam Aug 8, 2023
10404a5
Fix: Unit test
1yam Aug 8, 2023
db7f847
Test
1yam Aug 8, 2023
eb2c7ea
Fix: Launch control balance only on message post
1yam Aug 8, 2023
67cf926
Fix: Test error with no balance
1yam Aug 8, 2023
bb3e11e
Fix: No balance test
1yam Aug 8, 2023
2cff692
Fix: control message type
1yam Aug 8, 2023
9f1738a
Fix: re add __version__
1yam Aug 8, 2023
5a43c29
Refractor: get_volum_size use get_message_file_pin functions instead …
1yam Aug 8, 2023
64d00b0
Fix: mypy error
1yam Aug 8, 2023
0175169
Internal: reenable AVAX signature unit tests (#461)
odesenfans Jul 30, 2023
b326c4f
Update src/aleph/handlers/content/content_handler.py
1yam Aug 8, 2023
68eb9e0
Merge branch '1yam-vm-balance-check' of github.com:1yam/pyaleph into …
1yam Aug 8, 2023
9dfc207
Fix: type error functions return Decimal instead of int
1yam Aug 8, 2023
377ae1a
Refactor: move cost calcul & test for cost
1yam Aug 8, 2023
6a46672
Refactor type : ExecutableContent instead of ExecutableContent
1yam Aug 8, 2023
fbf68a5
Fix
1yam Aug 8, 2023
c3225d7
Fix: formatting issues
odesenfans Aug 21, 2023
f2af477
Fix: compute included storage in GiB instead of GB
odesenfans Aug 21, 2023
b2334dd
Fix: insufficient balance error format
odesenfans Aug 21, 2023
98b8129
mib
odesenfans Aug 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions deployment/migrations/versions/0018_7bcb8e5fe186_fix_vm_cost_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
"""fix VM cost view

Revision ID: 7bcb8e5fe186
Revises: f9fa39b6bdef
Create Date: 2023-08-04 15:14:39.082370

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '7bcb8e5fe186'
down_revision = 'f9fa39b6bdef'
branch_labels = None
depends_on = None

def upgrade() -> None:
op.execute(
"""
create or replace view vm_volumes_files_view as
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'code_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_code_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'data_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_data_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.program_hash AS vm_hash,
volume.ref,
volume.use_latest,
'runtime'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM program_runtimes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
UNION
SELECT volume.vm_hash,
volume.ref,
volume.use_latest,
'machine_volume'::text AS type,
tags.file_hash AS latest,
originals.file_hash AS original,
CASE
WHEN volume.use_latest THEN tags.file_hash
ELSE originals.file_hash
END AS volume_to_use
FROM vm_machine_volumes volume
LEFT JOIN file_tags tags ON volume.ref::text = tags.tag::text
JOIN file_pins originals ON volume.ref::text = originals.item_hash::text
"""
)
op.execute(
"""
create or replace view costs_view as
SELECT COALESCE(vm_prices.owner, storage.owner) AS address,
vm_prices.total_vm_cost,
sc.total_storage_cost,
tc.total_cost
FROM (SELECT vm_costs_view.owner,
sum(vm_costs_view.total_price) AS total_vm_cost
FROM vm_costs_view
GROUP BY vm_costs_view.owner) vm_prices
FULL JOIN (SELECT file_pins.owner,
sum(f.size) AS storage_size
FROM file_pins
JOIN files f ON file_pins.file_hash::text = f.hash::text
WHERE file_pins.owner IS NOT NULL
GROUP BY file_pins.owner) storage ON vm_prices.owner::text = storage.owner::text,
LATERAL ( SELECT 3::numeric * storage.storage_size / 1000000::numeric AS total_storage_cost) sc,
LATERAL ( SELECT COALESCE(vm_prices.total_vm_cost, 0::double precision) +
COALESCE(sc.total_storage_cost, 0::numeric)::double precision AS total_cost) tc
"""
)
op.execute(
"""
create or replace view vm_costs_view as
SELECT vm_versions.vm_hash,
vm_versions.owner,
vms.resources_vcpus,
vms.resources_memory,
file_volumes_size.file_volumes_size,
other_volumes_size.other_volumes_size,
used_disk.required_disk_space,
cu.compute_units_required,
bcp.base_compute_unit_price,
m.compute_unit_price_multiplier,
cpm.compute_unit_price,
free_disk.included_disk_space,
additional_disk.additional_disk_space,
adp.disk_price,
tp.total_price
FROM vm_versions
JOIN vms ON vm_versions.current_version::text = vms.item_hash::text
JOIN (SELECT volume.vm_hash,
sum(files.size) AS file_volumes_size
FROM vm_volumes_files_view volume
LEFT JOIN files ON volume.volume_to_use::text = files.hash::text
GROUP BY volume.vm_hash) file_volumes_size
ON vm_versions.current_version::text = file_volumes_size.vm_hash::text
JOIN (SELECT instance_rootfs.instance_hash,
instance_rootfs.size_mib::bigint * 1024 * 1024 AS rootfs_size
FROM instance_rootfs) rootfs_size ON vm_versions.vm_hash::text = rootfs_size.instance_hash::text
JOIN (SELECT vm_machine_volumes.vm_hash,
sum(vm_machine_volumes.size_mib) * 1024 * 1024 AS other_volumes_size
FROM vm_machine_volumes
GROUP BY vm_machine_volumes.vm_hash) other_volumes_size
ON vm_versions.current_version::text = other_volumes_size.vm_hash::text,
LATERAL ( SELECT file_volumes_size.file_volumes_size +
other_volumes_size.other_volumes_size::numeric AS required_disk_space) used_disk,
LATERAL ( SELECT ceil(GREATEST(ceil((vms.resources_vcpus / 1)::double precision),
(vms.resources_memory / 2000)::double precision)) AS compute_units_required) cu,
LATERAL ( SELECT CASE
WHEN COALESCE(vms.persistent, true)
THEN '21474836480'::bigint::double precision * cu.compute_units_required
ELSE 2147483648::double precision * cu.compute_units_required
END AS included_disk_space) free_disk,
LATERAL ( SELECT GREATEST((file_volumes_size.file_volumes_size + rootfs_size.rootfs_size::numeric +
other_volumes_size.other_volumes_size::numeric)::double precision -
free_disk.included_disk_space,
0::double precision) AS additional_disk_space) additional_disk,
LATERAL ( SELECT CASE
WHEN COALESCE(vms.persistent, true) THEN 2000
ELSE 200
END AS base_compute_unit_price) bcp,
LATERAL ( SELECT 1 + vms.environment_internet::integer AS compute_unit_price_multiplier) m,
LATERAL ( SELECT cu.compute_units_required * m.compute_unit_price_multiplier::double precision *
bcp.base_compute_unit_price::double precision *
m.compute_unit_price_multiplier::double precision AS compute_unit_price) cpm,
LATERAL ( SELECT additional_disk.additional_disk_space * 20::double precision /
(1024 * 1024)::double precision AS disk_price) adp,
LATERAL ( SELECT cpm.compute_unit_price + adp.disk_price AS total_price) tp


"""
)
op.execute(
"""
INSERT INTO error_codes(code, description) VALUES
(5, 'Insufficient balance')
"""
)


def downgrade() -> None:
op.execute("drop view costs_view")
op.execute("drop view vm_costs_view")
op.execute("drop view vm_volumes_files_view")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not really what the downgrade should do. A downgrade should bring the view back to its previous version. I'll fix it myself, just FYI.

15 changes: 13 additions & 2 deletions src/aleph/db/accessors/vms.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as dt
from typing import Optional, Iterable

from sqlalchemy import delete, func, select
from decimal import Decimal
from sqlalchemy import delete, func, select, text
from sqlalchemy.dialects.postgresql import insert

from aleph.db.models.vms import (
Expand Down Expand Up @@ -113,3 +113,14 @@ def refresh_vm_version(session: DbSession, vm_hash: str) -> None:
)
session.execute(delete(VmVersionDb).where(VmVersionDb.vm_hash == vm_hash))
session.execute(upsert_stmt)


def get_total_cost_for_address(session: DbSession, address: str) -> Decimal:
select_stmt = (
select(func.sum(text("total_cost")))
.select_from(text("public.costs_view"))
.where(text("address = :address"))
).params(address=address)

total_cost = session.execute(select_stmt).scalar()
return Decimal(total_cost) if total_cost is not None else Decimal(0)
11 changes: 11 additions & 0 deletions src/aleph/handlers/content/content_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ async def process(self, session: DbSession, messages: List[MessageDb]) -> None:
"""
pass

async def check_balance(self, session: DbSession, message: MessageDb) -> None:
"""
Checks whether the user has enough Aleph tokens to process the message.

Raises InsufficientBalanceException if the balance of the user is too low.

:param session: DB session.
:param message: Message being processed.
"""
pass

async def check_dependencies(self, session: DbSession, message: MessageDb) -> None:
"""
Check dependencies of a message.
Expand Down
45 changes: 44 additions & 1 deletion src/aleph/handlers/content/vm.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import logging
import math
from typing import List, Set, overload, Protocol, Optional

from aleph_message.models import ProgramContent, ExecutableContent, InstanceContent
from aleph_message.models import (
ProgramContent,
ExecutableContent,
InstanceContent,
MessageType,
)
from aleph_message.models.execution.volume import (
AbstractVolume,
ImmutableVolume,
EphemeralVolume,
PersistentVolume,
ParentVolume,
)
from decimal import Decimal

from aleph.db.accessors.balances import get_total_balance
from aleph.db.accessors.files import (
find_file_tags,
find_file_pins,
Expand All @@ -23,6 +31,7 @@
delete_vm_updates,
refresh_vm_version,
is_vm_amend_allowed,
get_total_cost_for_address,
)
from aleph.db.models import (
MessageDb,
Expand All @@ -39,8 +48,10 @@
RootfsVolumeDb,
VmBaseDb,
StoredFileDb,
FilePinDb,
)
from aleph.handlers.content.content_handler import ContentHandler
from aleph.services.cost import compute_cost
from aleph.toolkit.timestamp import timestamp_to_datetime
from aleph.types.db_session import DbSession
from aleph.types.files import FileTag
Expand All @@ -52,6 +63,7 @@
VmUpdateNotAllowed,
VmCannotUpdateUpdate,
VmVolumeTooSmall,
InsufficientBalanceException,
)
from aleph.types.vms import VmVersion

Expand All @@ -67,6 +79,12 @@ def _get_vm_content(message: MessageDb) -> ExecutableContent:
return content


from aleph_message.models.execution.program import (
MachineType,
ProgramContent,
)


@overload
def _map_content_to_db_model(item_hash: str, content: InstanceContent) -> VmInstanceDb:
...
Expand Down Expand Up @@ -289,6 +307,9 @@ class HasParent(Protocol):
)





class VmMessageHandler(ContentHandler):
"""
Handles both PROGRAM and INSTANCE messages.
Expand All @@ -298,6 +319,27 @@ class VmMessageHandler(ContentHandler):

"""

async def check_balance(self, session: DbSession, message: MessageDb) -> None:
if message.type != MessageType.instance:
return
content = _get_vm_content(message)
if isinstance(content, ProgramContent):
return

required_tokens = compute_cost(session=session, content=content)
current_balance = (
get_total_balance(address=content.address, session=session) or 0
)
current_instance_costs = get_total_cost_for_address(
session=session, address=content.address
)

if current_balance < current_instance_costs + required_tokens:
raise InsufficientBalanceException(
balance=Decimal(current_balance),
required_balance=current_instance_costs + required_tokens,
)

async def check_dependencies(self, session: DbSession, message: MessageDb) -> None:
content = _get_vm_content(message)

Expand Down Expand Up @@ -358,3 +400,4 @@ async def forget_message(self, session: DbSession, message: MessageDb) -> Set[st
refresh_vm_version(session=session, vm_hash=message.item_hash)

return update_hashes

2 changes: 1 addition & 1 deletion src/aleph/handlers/message_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ async def add_pending_message(
tx_hash: Optional[str] = None,
check_message: bool = True,
) -> Optional[PendingMessageDb]:

# TODO: this implementation is just messy, improve it.
with self.session_factory() as session:
try:
Expand Down Expand Up @@ -318,6 +317,7 @@ async def process(
content_handler = self.get_content_handler(message.type)
await content_handler.check_dependencies(session=session, message=message)
await self.check_permissions(session=session, message=message)
await content_handler.check_balance(session=session, message=message)
await self.insert_message(
session=session, pending_message=pending_message, message=message
)
Expand Down
Loading
Loading