Skip to content

Commit e29297e

Browse files
committed
[asyncify-client] Feature complete
Spore: spotted-fairy
1 parent d25a1d3 commit e29297e

File tree

8 files changed

+2327
-0
lines changed

8 files changed

+2327
-0
lines changed

src/amp/admin/async_client.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""Async HTTP client for Amp Admin API.
2+
3+
This module provides the async AdminClient class for communicating
4+
with the Amp Admin API over HTTP using asyncio and httpx.
5+
"""
6+
7+
import os
8+
from typing import Optional
9+
10+
import httpx
11+
12+
from .errors import map_error_response
13+
14+
15+
class AsyncAdminClient:
16+
"""Async HTTP client for Amp Admin API.
17+
18+
Provides access to Admin API endpoints through sub-clients for
19+
datasets, jobs, and schema operations using async/await.
20+
21+
Args:
22+
base_url: Base URL for Admin API (e.g., 'http://localhost:8080')
23+
auth_token: Optional Bearer token for authentication (highest priority)
24+
auth: If True, load auth token from ~/.amp/cache (shared with TS CLI)
25+
26+
Authentication Priority (highest to lowest):
27+
1. Explicit auth_token parameter
28+
2. AMP_AUTH_TOKEN environment variable
29+
3. auth=True - reads from ~/.amp/cache/amp_cli_auth
30+
31+
Example:
32+
>>> # Use amp auth from file
33+
>>> async with AsyncAdminClient('http://localhost:8080', auth=True) as client:
34+
... datasets = await client.datasets.list_all()
35+
>>>
36+
>>> # Use manual token
37+
>>> async with AsyncAdminClient('http://localhost:8080', auth_token='your-token') as client:
38+
... job = await client.jobs.get(123)
39+
"""
40+
41+
def __init__(self, base_url: str, auth_token: Optional[str] = None, auth: bool = False):
42+
"""Initialize async Admin API client.
43+
44+
Args:
45+
base_url: Base URL for Admin API (e.g., 'http://localhost:8080')
46+
auth_token: Optional Bearer token for authentication
47+
auth: If True, load auth token from ~/.amp/cache
48+
49+
Raises:
50+
ValueError: If both auth=True and auth_token are provided
51+
"""
52+
if auth and auth_token:
53+
raise ValueError('Cannot specify both auth=True and auth_token. Choose one authentication method.')
54+
55+
self.base_url = base_url.rstrip('/')
56+
57+
# Resolve auth token provider with priority: explicit param > env var > auth file
58+
self._get_token = None
59+
if auth_token:
60+
# Priority 1: Explicit auth_token parameter (static token)
61+
self._get_token = lambda: auth_token
62+
elif os.getenv('AMP_AUTH_TOKEN'):
63+
# Priority 2: AMP_AUTH_TOKEN environment variable (static token)
64+
env_token = os.getenv('AMP_AUTH_TOKEN')
65+
self._get_token = lambda: env_token
66+
elif auth:
67+
# Priority 3: Load from ~/.amp-cli-config/amp_cli_auth (auto-refreshing)
68+
from amp.auth import AuthService
69+
70+
auth_service = AuthService()
71+
self._get_token = auth_service.get_token # Callable that auto-refreshes
72+
73+
# Create async HTTP client (no auth header yet - will be added per-request)
74+
self._http = httpx.AsyncClient(
75+
base_url=self.base_url,
76+
timeout=30.0,
77+
follow_redirects=True,
78+
)
79+
80+
async def _request(
81+
self, method: str, path: str, json: Optional[dict] = None, params: Optional[dict] = None, **kwargs
82+
) -> httpx.Response:
83+
"""Make async HTTP request with error handling.
84+
85+
Args:
86+
method: HTTP method (GET, POST, DELETE, etc.)
87+
path: API endpoint path (e.g., '/datasets')
88+
json: Optional JSON request body
89+
params: Optional query parameters
90+
**kwargs: Additional arguments passed to httpx.request()
91+
92+
Returns:
93+
HTTP response object
94+
95+
Raises:
96+
AdminAPIError: If the API returns an error response
97+
"""
98+
# Add auth header dynamically (auto-refreshes if needed)
99+
headers = kwargs.get('headers', {})
100+
if self._get_token:
101+
headers['Authorization'] = f'Bearer {self._get_token()}'
102+
kwargs['headers'] = headers
103+
104+
response = await self._http.request(method, path, json=json, params=params, **kwargs)
105+
106+
# Handle error responses
107+
if response.status_code >= 400:
108+
try:
109+
error_data = response.json()
110+
raise map_error_response(response.status_code, error_data)
111+
except ValueError:
112+
# Response is not JSON, fall back to generic HTTP error
113+
response.raise_for_status()
114+
115+
return response
116+
117+
@property
118+
def datasets(self):
119+
"""Access async datasets client.
120+
121+
Returns:
122+
AsyncDatasetsClient for dataset operations
123+
"""
124+
from .async_datasets import AsyncDatasetsClient
125+
126+
return AsyncDatasetsClient(self)
127+
128+
@property
129+
def jobs(self):
130+
"""Access async jobs client.
131+
132+
Returns:
133+
AsyncJobsClient for job operations
134+
"""
135+
from .async_jobs import AsyncJobsClient
136+
137+
return AsyncJobsClient(self)
138+
139+
@property
140+
def schema(self):
141+
"""Access async schema client.
142+
143+
Returns:
144+
AsyncSchemaClient for schema operations
145+
"""
146+
from .async_schema import AsyncSchemaClient
147+
148+
return AsyncSchemaClient(self)
149+
150+
async def close(self):
151+
"""Close the HTTP client and release resources.
152+
153+
Example:
154+
>>> client = AsyncAdminClient('http://localhost:8080')
155+
>>> try:
156+
... datasets = await client.datasets.list_all()
157+
... finally:
158+
... await client.close()
159+
"""
160+
await self._http.aclose()
161+
162+
async def __aenter__(self):
163+
"""Async context manager entry."""
164+
return self
165+
166+
async def __aexit__(self, exc_type, exc_val, exc_tb):
167+
"""Async context manager exit."""
168+
await self.close()

0 commit comments

Comments
 (0)