Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 88 additions & 36 deletions src/apify/_charging.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import TYPE_CHECKING, Protocol
from typing import TYPE_CHECKING, Protocol, TypedDict

from pydantic import TypeAdapter

from crawlee._utils.context import ensure_context

from apify._models import ActorRun, PricingModel
from apify._models import (
ActorRun,
FlatPricePerMonthActorPricingInfo,
FreeActorPricingInfo,
PayPerEventActorPricingInfo,
PricePerDatasetItemActorPricingInfo,
PricingModel,
)
from apify._utils import docs_group
from apify.log import logger
from apify.storages import Dataset
Expand Down Expand Up @@ -115,20 +122,12 @@ class ChargingManagerImplementation(ChargingManager):

def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> None:
self._max_total_charge_usd = configuration.max_total_charge_usd or Decimal('inf')
self._configuration = configuration
self._is_at_home = configuration.is_at_home
self._actor_run_id = configuration.actor_run_id
self._purge_charging_log_dataset = configuration.purge_on_start
self._pricing_model: PricingModel | None = None

if configuration.test_pay_per_event:
if self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)

self._pricing_model = 'PAY_PER_EVENT'

self._client = client
self._charging_log_dataset: Dataset | None = None

Expand All @@ -140,37 +139,46 @@ def __init__(self, configuration: Configuration, client: ApifyClientAsync) -> No

async def __aenter__(self) -> None:
"""Initialize the charging manager - this is called by the `Actor` class and shouldn't be invoked manually."""
self.active = True

if self._is_at_home:
# Running on the Apify platform - fetch pricing info for the current run.

if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')
# Validate config
if self._configuration.test_pay_per_event and self._is_at_home:
raise ValueError(
'Using the ACTOR_TEST_PAY_PER_EVENT environment variable is only supported '
'in a local development environment'
)

run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')
self.active = True

if run.pricing_info is not None:
self._pricing_model = run.pricing_info.pricing_model
# Retrieve pricing information from env vars or API
pricing_data = await self._fetch_pricing_info()
pricing_info = pricing_data['pricing_info']
charged_event_counts = pricing_data['charged_event_counts']
max_total_charge_usd = pricing_data['max_total_charge_usd']

if run.pricing_info.pricing_model == 'PAY_PER_EVENT':
for event_name, event_pricing in run.pricing_info.pricing_per_event.actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=event_pricing.event_price_usd,
title=event_pricing.event_title,
)
# Set pricing model
if self._configuration.test_pay_per_event:
self._pricing_model = 'PAY_PER_EVENT'
else:
self._pricing_model = pricing_info.pricing_model if pricing_info else None

# Load per-event pricing information
if pricing_info and pricing_info.pricing_model == 'PAY_PER_EVENT':
for event_name, event_pricing in pricing_info.pricing_per_event.actor_charge_events.items():
self._pricing_info[event_name] = PricingInfoItem(
price=event_pricing.event_price_usd,
title=event_pricing.event_title,
)

self._max_total_charge_usd = run.options.max_total_charge_usd or self._max_total_charge_usd
self._max_total_charge_usd = max_total_charge_usd

for event_name, count in (run.charged_event_counts or {}).items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)
# Load charged event counts
for event_name, count in charged_event_counts.items():
price = self._pricing_info.get(event_name, PricingInfoItem(Decimal(), title='')).price
self._charging_state[event_name] = ChargingStateItem(
charge_count=count,
total_charged_amount=count * price,
)

# Set up charging log dataset for local development
if not self._is_at_home and self._pricing_model == 'PAY_PER_EVENT':
# We are not running on the Apify platform, but PPE is enabled for testing - open a dataset that
# will contain a log of all charge calls for debugging purposes.
Expand Down Expand Up @@ -328,6 +336,38 @@ def get_charged_event_count(self, event_name: str) -> int:
def get_max_total_charge_usd(self) -> Decimal:
return self._max_total_charge_usd

async def _fetch_pricing_info(self) -> _FetchedPricingInfoDict:
"""Fetch pricing information from environment variables or API."""
# Check if pricing info is available via environment variables
if self._configuration.actor_pricing_info is not None and self._configuration.charged_event_counts is not None:
return _FetchedPricingInfoDict(
pricing_info=self._configuration.actor_pricing_info,
charged_event_counts=self._configuration.charged_event_counts,
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
)

# Fall back to API call
if self._is_at_home:
if self._actor_run_id is None:
raise RuntimeError('Actor run ID not found even though the Actor is running on Apify')

run = run_validator.validate_python(await self._client.run(self._actor_run_id).get())
if run is None:
raise RuntimeError('Actor run not found')

return _FetchedPricingInfoDict(
pricing_info=run.pricing_info,
charged_event_counts=run.charged_event_counts or {},
max_total_charge_usd=run.options.max_total_charge_usd or Decimal('inf'),
)

# Local development without environment variables
return _FetchedPricingInfoDict(
pricing_info=None,
charged_event_counts={},
max_total_charge_usd=self._configuration.max_total_charge_usd or Decimal('inf'),
)


@dataclass
class ChargingStateItem:
Expand All @@ -339,3 +379,15 @@ class ChargingStateItem:
class PricingInfoItem:
price: Decimal
title: str


class _FetchedPricingInfoDict(TypedDict):
pricing_info: (
FreeActorPricingInfo
| FlatPricePerMonthActorPricingInfo
| PricePerDatasetItemActorPricingInfo
| PayPerEventActorPricingInfo
| None
)
charged_event_counts: dict[str, int]
max_total_charge_usd: Decimal
30 changes: 30 additions & 0 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import json
from datetime import datetime, timedelta
from decimal import Decimal
from logging import getLogger
Expand All @@ -14,6 +15,12 @@
from crawlee._utils.urls import validate_http_url
from crawlee.configuration import Configuration as CrawleeConfiguration

from apify._models import (
FlatPricePerMonthActorPricingInfo,
FreeActorPricingInfo,
PayPerEventActorPricingInfo,
PricePerDatasetItemActorPricingInfo,
)
from apify._utils import docs_group

logger = getLogger(__name__)
Expand Down Expand Up @@ -409,6 +416,29 @@ class Configuration(CrawleeConfiguration):
),
] = None

actor_pricing_info: Annotated[
FreeActorPricingInfo
| FlatPricePerMonthActorPricingInfo
| PricePerDatasetItemActorPricingInfo
| PayPerEventActorPricingInfo
| None,
Field(
alias='apify_actor_pricing_info',
description='JSON string with prising info of the actor',
discriminator='pricing_model',
),
BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data if data else None),
] = None

charged_event_counts: Annotated[
dict[str, int] | None,
Field(
alias='apify_charged_actor_event_counts',
description='Counts of events that were charged for the actor',
),
BeforeValidator(lambda data: json.loads(data) if isinstance(data, str) else data if data else None),
] = None

@model_validator(mode='after')
def disable_browser_sandbox_on_platform(self) -> Self:
"""Disable the browser sandbox mode when running on the Apify platform.
Expand Down
Loading