Skip to content

Added endpoint for economy simulation to -api-v2 #278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion projects/policyengine-api-full/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ dependencies = [
"opentelemetry-instrumentation-sqlalchemy (>=0.51b0,<0.52)",
"pydantic-settings (>=2.7.1,<3.0.0)",
"opentelemetry-instrumentation-fastapi (>=0.51b0,<0.52)",
"policyengine-fastapi"
"policyengine-fastapi",
"google-cloud-workflows>=1.18.1"
]

[tool.uv.sources]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
from policyengine_api.fastapi.auth.jwt_decoder import JWTDecoder
from policyengine_api.fastapi.database import create_session_dep
from .household import include_all_routers

from policyengine_api.api.routers import (
economy,
)

"""
Application defined as routers completely indipendent of environment allowing it
to easily be run in whatever cloud provider container or desktop or test environment.
Expand All @@ -24,3 +27,9 @@ def initialize(app: FastAPI, engine: Engine, jwt_issuer: str, jwt_audience: str)
optional_auth=optional_auth,
auth=auth,
)

# Attaching economy simulation routes separately.
# These endpoints run macro-economic impact simulations and may trigger
# cloud workflows (GCP) or a local simulation API when running in desktop mode.
# Keeping them separate to avoid coupling with household/user CRUD routes.
app.include_router(economy.router)
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# economy_simulation.py
from fastapi import APIRouter, Query, Body, HTTPException
from typing import Literal
from policyengine_api.api.services.simulation_runner import SimulationRunner

router = APIRouter()
runner = SimulationRunner()


@router.post("/{country_id}/economy/{policy_id}/over/{baseline_policy_id}")
async def start_simulation(
country_id: str,
policy_id: int,
baseline_policy_id: int,
region: str = Query(...),
dataset: str = Query("default"),
time_period: str = Query(...),
target: Literal["general", "cliff"] = Query("general"),
version: str | None = Query(None),
reform: dict = Body(...),
baseline: dict = Body(...),
):
try:
result = await runner.start_simulation(
country_id=country_id,
reform=reform,
baseline=baseline,
region=region,
dataset=dataset,
time_period=time_period,
scope="macro",
model_version=version,
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


@router.get("/economy/result")
async def get_simulation_result(execution_id: str):
try:
result = await runner.get_simulation_result(execution_id)
print("SKLOGS: get api called")
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import os
import json
import uuid
import asyncio
import httpx

from typing import Literal, Any
from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
from google.cloud.workflows.executions_v1.types import Execution
from google.protobuf.json_format import MessageToDict
from policyengine_api_full.settings import get_settings, Environment

class SimulationRunner:
def __init__(self):
# self.is_desktop = os.getenv("PE_MODE") == "desktop"
settings = get_settings()
self.is_desktop = settings.environment == Environment.DESKTOP
print(f"SKLOGS ENVIRONMENT: {get_settings().environment}")

if not self.is_desktop:
self.project = "prod-api-v2-c4d5"
self.location = "us-central1"
self.workflow = "simulation-workflow"
self.execution_client = executions_v1.ExecutionsClient()
self.workflows_client = workflows_v1.WorkflowsClient()
else:
self.desktop_url = os.getenv(
"SIMULATION_LOCAL_URL",
"http://localhost:8081/simulate/economy/comparison",
)
self._simulations: dict[str, dict] = {}
self._lock = asyncio.Lock() # To protect self._simulations

def _build_payload(
self,
country_id: str,
reform: dict,
baseline: dict,
region: str,
dataset: str,
time_period: str,
scope: Literal["macro", "household"] = "macro",
model_version: str | None = None,
data_version: str | None = None,
) -> dict[str, Any]:
return {
"country": country_id,
"scope": scope,
"reform": reform,
"baseline": baseline,
"time_period": time_period,
"region": region,
"data": dataset,
"model_version": model_version,
"data_version": data_version,
}

async def start_simulation(
self,
country_id: str,
reform: dict,
baseline: dict,
region: str,
dataset: str,
time_period: str,
scope: Literal["macro", "household"] = "macro",
model_version: str | None = None,
data_version: str | None = None,
) -> dict:
payload = self._build_payload(
country_id,
reform,
baseline,
region,
dataset,
time_period,
scope,
model_version,
data_version,
)

if self.is_desktop:
async with httpx.AsyncClient() as client:
response = await client.post(self.desktop_url, json=payload)
response.raise_for_status()
result = response.json()

execution_id = f"desktop-{uuid.uuid4()}"
async with self._lock:
self._simulations[execution_id] = {
"status": "SUCCEEDED",
"result": result,
"error": None,
}
return {"execution_id": execution_id}

else:
# Use asyncio.to_thread for blocking I/O
def create_execution():
workflow_path = self.workflows_client.workflow_path(
self.project, self.location, self.workflow
)
return self.execution_client.create_execution(
parent=workflow_path,
execution=Execution(argument=json.dumps(payload)),
)

execution = await asyncio.to_thread(create_execution)
return {"execution_id": execution.name}

async def get_simulation_result(self, execution_id: str) -> dict:
if self.is_desktop:
print("SKLOGS: in desktop mode")
async with self._lock:
if execution_id not in self._simulations:
raise ValueError(f"Unknown execution ID: {execution_id}")
simulation = self._simulations[execution_id]

return {
"execution_id": execution_id,
"status": simulation["status"],
"result": simulation["result"],
"error": simulation["error"],
}

else:
print("SKLOGS: in prod mode")

def get_execution():
return self.execution_client.get_execution(name=execution_id)

execution = await asyncio.to_thread(get_execution)
status = execution.state.name

response = {
"execution_id": execution_id,
"status": status,
}

if status == "SUCCEEDED":
response["result"] = json.loads(execution.result or "{}")
response["error"] = None
elif status == "FAILED":
try:
error_dict = MessageToDict(execution.error)
response["error"] = error_dict.get("message", str(execution.error))
except Exception:
response["error"] = str(execution.error)
response["result"] = None
else:
response["result"] = None
response["error"] = None

return response
17 changes: 17 additions & 0 deletions projects/policyengine-api-full/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions server_common.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ deploy:
echo "Building ${SERVICE_NAME} docker image"
cd ../../ && gcloud builds submit --region=${REGION} --substitutions=_IMAGE_TAG=${REGION}-docker.pkg.dev/${PROJECT_ID}/${REPO}/${SERVICE_NAME}:${TAG},_SERVICE_NAME=${SERVICE_NAME},_MODULE_NAME=${MODULE_NAME},_WORKER_COUNT=${WORKER_COUNT} ${BUILD_ARGS}

# Always run with a single worker in dev mode to match local desktop environment.
# Sets WORKER_COUNT=1 just for this target.
# Also set ENVIRONMENT=desktop in src/.env file for the app to pick up the correct config.
dev: WORKER_COUNT=1
dev:
echo "Running ${SERVICE_NAME} dev instance"
cd src && uv run uvicorn ${MODULE_NAME}:app --reload --port ${DEV_PORT} --workers ${WORKER_COUNT}
Loading