Skip to content

Commit d24affe

Browse files
karlwaldmanclaude
andcommitted
feat: market-brief + agent-subscription methods (sync+async, v1.9.0)
Add SDK support for the now-live #3245 endpoints on both the sync and async clients: - client.market_brief(codes, narrative=False) -> MarketBrief - client.subscriptions.list() / .create() / .delete() / .events() with friendly interval mapping ("5m"/"1h"/"daily" -> interval_seconds) and X-OPA-Source / X-OPA-Tool attribution headers (default source "sdk-python"). Mirrored on AsyncOilPriceAPI. New Pydantic models (MarketBrief, MarketBriefCommodity, MarketBriefForecast, Subscription, SubscriptionEvent) plus a SubscriptionEventsPage helper, all exported from the package root. Tests: mocked sync + async unit tests, interval-mapping coverage, and a read-only live integration test (marked `live`, skips without OILPRICEAPI_TEST_KEY, no prod writes). Version bumped 1.8.1 -> 1.9.0. Gates: mypy clean, ruff clean, pytest 311 passed (cov 59%). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 785b385 commit d24affe

14 files changed

Lines changed: 1021 additions & 5 deletions

README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,75 @@ result = client.data_sources.test("123")
426426
print(f"Test status: {result['status']}")
427427
```
428428

429+
### Market Brief (New in v1.9.0)
430+
431+
Get a multi-commodity structured summary (latest price, 24h change, and a
432+
1-month forecast per commodity) in a single request. Pass `narrative=True` to
433+
also receive a natural-language summary.
434+
435+
```python
436+
brief = client.market_brief(["BRENT_CRUDE_USD", "WTI_USD"], narrative=True)
437+
438+
print(f"As of: {brief.as_of}")
439+
for c in brief.commodities:
440+
print(f"{c.code}: ${c.price} ({c.change_24h_pct:+.2f}% 24h)")
441+
if c.forecast_1m:
442+
print(f" 1m forecast: {c.forecast_1m.point} "
443+
f"[{c.forecast_1m.low}{c.forecast_1m.high}] ({c.forecast_1m.confidence})")
444+
445+
if brief.narrative:
446+
print(brief.narrative)
447+
```
448+
449+
### Agent Subscriptions (New in v1.9.0)
450+
451+
Create persistent "watches" that periodically evaluate commodities and emit
452+
events your agent can poll for. The `interval` accepts a friendly string
453+
(`"5m"`, `"1h"`, `"daily"`) or raw seconds.
454+
455+
```python
456+
# Create a subscription
457+
sub = client.subscriptions.create(
458+
codes=["BRENT_CRUDE_USD"],
459+
interval="5m", # also accepts "1h", "daily", or 300
460+
name="Brent watch",
461+
)
462+
print(sub.id, sub.interval_seconds) # -> 300
463+
464+
# List subscriptions
465+
for s in client.subscriptions.list():
466+
print(s.name, s.codes, s.status)
467+
468+
# Poll for events using a cursor
469+
page = client.subscriptions.events(since=0)
470+
for event in page:
471+
print(event.type, event.code)
472+
# Persist page.cursor and pass it as `since` on the next poll
473+
next_page = client.subscriptions.events(since=page.cursor)
474+
475+
# Delete a subscription
476+
client.subscriptions.delete(sub.id)
477+
```
478+
479+
Attribution headers are sent automatically (`X-OPA-Source` defaults to
480+
`sdk-python`). MCP tools can override them:
481+
482+
```python
483+
client.subscriptions.create(
484+
["WTI_USD"], interval="1h", source="mcp", tool="claude-desktop"
485+
)
486+
```
487+
488+
All of the above is mirrored on the async client:
489+
490+
```python
491+
async with AsyncOilPriceAPI() as client:
492+
brief = await client.market_brief(["BRENT_CRUDE_USD"])
493+
sub = await client.subscriptions.create(["WTI_USD"], interval="daily")
494+
page = await client.subscriptions.events(since=0)
495+
await client.subscriptions.delete(sub.id)
496+
```
497+
429498
## 📊 Features
430499

431500
-**Simple API** - Intuitive methods for all endpoints
@@ -444,6 +513,8 @@ print(f"Test status: {result['status']}")
444513
-**Energy Intelligence** - EIA data, OPEC production, drilling productivity
445514
-**Data Quality** - Real-time quality monitoring and reporting
446515
-**Data Sources** - Connector management with health checks and logging
516+
-**Market Brief** - Multi-commodity structured + narrative summary in one call 🧠
517+
-**Agent Subscriptions** - Persistent watches + event polling for AI agents 🤖
447518
-**Async Support** - High-performance async client
448519
-**WebSocket Streaming** - Real-time price stream via ActionCable (Professional+)
449520
-**Smart Caching** - Reduce API calls automatically

oilpriceapi/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,15 @@
2828
DieselPrice,
2929
DieselStation,
3030
DieselStationsResponse,
31+
MarketBrief,
32+
MarketBriefCommodity,
33+
MarketBriefForecast,
3134
PriceAlert,
35+
Subscription,
36+
SubscriptionEvent,
3237
WebhookTestResponse,
3338
)
39+
from oilpriceapi.resources.subscriptions import SubscriptionEventsPage
3440
from oilpriceapi.streaming import (
3541
PriceStream,
3642
PriceUpdate,
@@ -56,6 +62,12 @@
5662
"PriceAlert",
5763
"WebhookTestResponse",
5864
"DataConnectorPrice",
65+
"MarketBrief",
66+
"MarketBriefCommodity",
67+
"MarketBriefForecast",
68+
"Subscription",
69+
"SubscriptionEvent",
70+
"SubscriptionEventsPage",
5971
"PriceStream",
6072
"StreamUpdate",
6173
"PriceUpdate",
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""
2+
Shared helpers for the agent-subscriptions + market-brief features (#3245).
3+
4+
Keeps the sync and async resources behaving identically: friendly interval
5+
parsing, attribution header construction, and response unwrapping.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from typing import Any, Dict, List, Optional, Union
11+
12+
# Default attribution source stamped on subscriptions created via this SDK.
13+
DEFAULT_SOURCE = "sdk-python"
14+
15+
# Friendly interval aliases → seconds. Anything else falls through to the
16+
# numeric parser below (e.g. "30s", "15m", "2h", "1d", or a raw int).
17+
_INTERVAL_ALIASES: Dict[str, int] = {
18+
"1m": 60,
19+
"5m": 300,
20+
"10m": 600,
21+
"15m": 900,
22+
"30m": 1800,
23+
"1h": 3600,
24+
"hourly": 3600,
25+
"6h": 21600,
26+
"12h": 43200,
27+
"1d": 86400,
28+
"daily": 86400,
29+
}
30+
31+
_UNIT_SECONDS = {"s": 1, "m": 60, "h": 3600, "d": 86400}
32+
33+
34+
def normalize_interval(interval: Union[str, int]) -> int:
35+
"""Convert a friendly interval into ``interval_seconds``.
36+
37+
Accepts an int (returned as-is), a named alias ("5m", "1h", "daily"), or a
38+
``<number><unit>`` string where unit is one of s/m/h/d (e.g. "30s", "2h").
39+
40+
Raises:
41+
ValueError: If the interval cannot be parsed or is non-positive.
42+
"""
43+
if isinstance(interval, bool): # bool is an int subclass; reject explicitly
44+
raise ValueError(f"Invalid interval: {interval!r}")
45+
if isinstance(interval, int):
46+
if interval <= 0:
47+
raise ValueError(f"interval_seconds must be positive, got {interval}")
48+
return interval
49+
50+
if not isinstance(interval, str):
51+
raise ValueError(f"Invalid interval type: {type(interval).__name__}")
52+
53+
key = interval.strip().lower()
54+
if key in _INTERVAL_ALIASES:
55+
return _INTERVAL_ALIASES[key]
56+
57+
# Plain integer string e.g. "300".
58+
if key.isdigit():
59+
seconds = int(key)
60+
if seconds <= 0:
61+
raise ValueError(f"interval_seconds must be positive, got {seconds}")
62+
return seconds
63+
64+
# <number><unit> form e.g. "45s", "2h".
65+
if len(key) >= 2 and key[-1] in _UNIT_SECONDS and key[:-1].isdigit():
66+
seconds = int(key[:-1]) * _UNIT_SECONDS[key[-1]]
67+
if seconds <= 0:
68+
raise ValueError(f"interval_seconds must be positive, got {seconds}")
69+
return seconds
70+
71+
raise ValueError(
72+
f"Unrecognized interval {interval!r}. Use seconds (int), a named alias "
73+
f"('5m', '1h', 'daily'), or '<n><unit>' where unit is s/m/h/d."
74+
)
75+
76+
77+
def build_attribution_headers(
78+
source: Optional[str] = None,
79+
tool: Optional[str] = None,
80+
) -> Dict[str, str]:
81+
"""Build the X-OPA-Source / X-OPA-Tool attribution headers.
82+
83+
Defaults ``source`` to ``sdk-python``; omits the tool header when unset.
84+
"""
85+
headers: Dict[str, str] = {"X-OPA-Source": source or DEFAULT_SOURCE}
86+
if tool:
87+
headers["X-OPA-Tool"] = tool
88+
return headers
89+
90+
91+
def build_create_body(
92+
codes: List[str],
93+
interval: Union[str, int],
94+
name: Optional[str] = None,
95+
) -> Dict[str, Any]:
96+
"""Build the POST /v1/subscriptions request body from friendly args."""
97+
body: Dict[str, Any] = {
98+
"codes": list(codes),
99+
"interval_seconds": normalize_interval(interval),
100+
}
101+
if name is not None:
102+
body["name"] = name
103+
return body
104+
105+
106+
def unwrap_data(response: Any) -> Dict[str, Any]:
107+
"""Return the ``data`` object from a ``{status, data}`` envelope."""
108+
if isinstance(response, dict) and "data" in response:
109+
data = response["data"]
110+
return data if isinstance(data, dict) else {}
111+
return response if isinstance(response, dict) else {}

oilpriceapi/async_client.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
logger = logging.getLogger(__name__)
1919

20+
from ._subscriptions_common import unwrap_data
2021
from .async_resources import (
2122
AsyncAlertsResource,
2223
AsyncAnalyticsResource,
@@ -31,6 +32,7 @@
3132
AsyncFuturesResource,
3233
AsyncRigCountsResource,
3334
AsyncStorageResource,
35+
AsyncSubscriptionsResource,
3436
AsyncWebhooksResource,
3537
)
3638
from .exceptions import (
@@ -42,7 +44,7 @@
4244
ServerError,
4345
TimeoutError,
4446
)
45-
from .models import HistoricalPrice, HistoricalResponse, Price
47+
from .models import HistoricalPrice, HistoricalResponse, MarketBrief, Price
4648
from .retry import RetryStrategy
4749

4850

@@ -148,6 +150,8 @@ def __init__(
148150
self.ei = AsyncEnergyIntelligenceResource(self)
149151
self.webhooks = AsyncWebhooksResource(self)
150152
self.data_sources = AsyncDataSourcesResource(self)
153+
# Agent watch subscriptions + event polling (#3245 Phase 2).
154+
self.subscriptions = AsyncSubscriptionsResource(self)
151155

152156
# Real-time WebSocket streaming namespace (requires the [stream] extra).
153157
# Lazily imports `websockets` only when a stream is actually opened.
@@ -342,6 +346,34 @@ def _parse_rate_limit_reset(self, headers: httpx.Headers) -> Optional[datetime]:
342346
pass
343347
return None
344348

349+
async def market_brief(
350+
self,
351+
codes: List[str],
352+
narrative: bool = False,
353+
) -> MarketBrief:
354+
"""Get a multi-commodity structured (+ optional narrative) market brief.
355+
356+
Composes existing price/forecast data for the given commodity codes into
357+
a single structured summary (#3245 Phase 1a). Counts as one request.
358+
359+
Args:
360+
codes: Commodity codes to include (e.g. ["BRENT_CRUDE_USD", "WTI"]).
361+
narrative: When True, request a natural-language narrative as well.
362+
363+
Returns:
364+
A MarketBrief model.
365+
"""
366+
params: Dict[str, Any] = {"codes": ",".join(codes)}
367+
if narrative:
368+
params["narrative"] = "true"
369+
370+
response = await self.request(
371+
method="GET",
372+
path="/v1/market-brief",
373+
params=params,
374+
)
375+
return MarketBrief(**unwrap_data(response))
376+
345377
async def close(self):
346378
"""Close the HTTP client and flush telemetry."""
347379
self._telemetry.close()

oilpriceapi/async_resources.py

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33
from datetime import date, datetime
44
from typing import Any, Dict, List, Optional, Union
55

6+
from ._subscriptions_common import (
7+
build_attribution_headers,
8+
build_create_body,
9+
unwrap_data,
10+
)
611
from .exceptions import ValidationError
7-
from .models import DieselPrice, DieselStationsResponse, PriceAlert
12+
from .models import DieselPrice, DieselStationsResponse, PriceAlert, Subscription, SubscriptionEvent
813
from .resource_validators import VALID_OPERATORS, format_date
914
from .resources._futures_slug import normalize_futures_slug
15+
from .resources.subscriptions import SubscriptionEventsPage
1016

1117

1218
class AsyncDieselResource:
@@ -1403,3 +1409,83 @@ async def rotate_credentials(self, source_id: str, new_credentials: Dict[str, An
14031409
if "data" in response:
14041410
return response["data"]
14051411
return response
1412+
1413+
1414+
class AsyncSubscriptionsResource:
1415+
"""Async resource for agent-subscription CRUD and event polling (#3245)."""
1416+
1417+
def __init__(self, client: Any) -> None:
1418+
self.client = client
1419+
1420+
async def list(self) -> List[Subscription]:
1421+
"""List all subscriptions for the authenticated user."""
1422+
response = await self.client.request(method="GET", path="/v1/subscriptions")
1423+
data = unwrap_data(response)
1424+
subs = data.get("subscriptions", [])
1425+
return [Subscription(**s) for s in subs]
1426+
1427+
async def create(
1428+
self,
1429+
codes: List[str],
1430+
interval: Union[str, int],
1431+
name: Optional[str] = None,
1432+
source: Optional[str] = None,
1433+
tool: Optional[str] = None,
1434+
) -> Subscription:
1435+
"""Create a new subscription (watch).
1436+
1437+
Args:
1438+
codes: Commodity codes to watch (e.g. ["BRENT_CRUDE_USD"]).
1439+
interval: Friendly interval ("5m", "1h", "daily") or seconds (int).
1440+
name: Optional human-friendly name.
1441+
source: Attribution source header (defaults to "sdk-python").
1442+
tool: Optional attribution tool name header.
1443+
"""
1444+
body = build_create_body(codes, interval, name=name)
1445+
headers = build_attribution_headers(source=source, tool=tool)
1446+
response = await self.client.request(
1447+
method="POST",
1448+
path="/v1/subscriptions",
1449+
json_data=body,
1450+
headers=headers,
1451+
)
1452+
data = unwrap_data(response)
1453+
sub = data.get("subscription", data)
1454+
return Subscription(**sub)
1455+
1456+
async def delete(self, subscription_id: str) -> bool:
1457+
"""Delete a subscription. Returns True on success."""
1458+
await self.client.request(
1459+
method="DELETE",
1460+
path=f"/v1/subscriptions/{subscription_id}",
1461+
)
1462+
return True
1463+
1464+
async def events(
1465+
self,
1466+
since: Optional[int] = None,
1467+
limit: Optional[int] = None,
1468+
watch_id: Optional[str] = None,
1469+
) -> SubscriptionEventsPage:
1470+
"""Poll for subscription events newer than a cursor.
1471+
1472+
Returns a SubscriptionEventsPage with events, cursor, and has_more.
1473+
"""
1474+
params: Dict[str, Any] = {}
1475+
if since is not None:
1476+
params["since"] = since
1477+
if limit is not None:
1478+
params["limit"] = limit
1479+
if watch_id is not None:
1480+
params["watch_id"] = watch_id
1481+
1482+
response = await self.client.request(
1483+
method="GET",
1484+
path="/v1/subscriptions/events",
1485+
params=params,
1486+
)
1487+
data = unwrap_data(response)
1488+
events = [SubscriptionEvent(**e) for e in data.get("events", [])]
1489+
cursor = data.get("cursor")
1490+
has_more = bool(data.get("has_more", False))
1491+
return SubscriptionEventsPage(events=events, cursor=cursor, has_more=has_more)

0 commit comments

Comments
 (0)