-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathmodels.py
More file actions
510 lines (387 loc) · 20.5 KB
/
Copy pathmodels.py
File metadata and controls
510 lines (387 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
"""
OilPriceAPI Data Models
Pydantic models for API responses.
"""
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, field_validator
class Price(BaseModel):
"""Single price data point."""
model_config = ConfigDict(populate_by_name=True)
commodity: str = Field(description="Commodity code (e.g., BRENT_CRUDE_USD)")
value: float = Field(description="Current price value")
currency: Optional[str] = Field(default=None, description="Currency code (may be absent in minimal API responses)")
unit: str = Field(description="Unit of measurement")
timestamp: datetime = Field(description="Price timestamp")
change: Optional[float] = Field(default=None, description="Price change amount")
change_percent: Optional[float] = Field(default=None, alias="change_percentage", description="Percentage change")
previous_close: Optional[float] = Field(default=None, description="Previous closing price")
open: Optional[float] = Field(default=None, description="Opening price")
high: Optional[float] = Field(default=None, description="Daily high")
low: Optional[float] = Field(default=None, description="Daily low")
volume: Optional[int] = Field(default=None, description="Trading volume")
@field_validator('timestamp', mode='before')
@classmethod
def parse_timestamp(cls, v):
"""Parse timestamp from various formats."""
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
# Try other common formats
from dateutil import parser
return parser.parse(v)
return v
@property
def is_up(self) -> bool:
"""Check if price is up."""
return self.change is not None and self.change > 0
@property
def is_down(self) -> bool:
"""Check if price is down."""
return self.change is not None and self.change < 0
def __str__(self) -> str:
"""String representation."""
change_str = ""
if self.change_percent is not None:
arrow = "↑" if self.is_up else "↓" if self.is_down else "→"
change_str = f" {arrow} {abs(self.change_percent):.2f}%"
currency_str = self.currency or ""
return f"{self.commodity}: {currency_str}{self.value:.2f}{change_str}"
class PriceResponse(BaseModel):
"""Response from current price endpoint."""
model_config = ConfigDict(populate_by_name=True)
success: bool = Field(default=True)
data: Price
timestamp: datetime = Field(description="Response timestamp")
class MultiplePricesResponse(BaseModel):
"""Response with multiple prices."""
model_config = ConfigDict(populate_by_name=True)
success: bool = Field(default=True)
data: List[Price]
timestamp: datetime
count: int = Field(description="Number of prices returned")
class HistoricalPrice(BaseModel):
"""Historical price data point."""
model_config = ConfigDict(populate_by_name=True)
date: datetime = Field(alias="created_at")
commodity: str = Field(alias="commodity_name")
value: float = Field(alias="price")
currency: Optional[str] = Field(default=None, description="Currency code from API response (may be absent)")
unit: str = Field(alias="unit_of_measure")
type_name: Optional[str] = Field(default="spot_price")
@field_validator('date', mode='before')
@classmethod
def parse_date(cls, v):
"""Parse date from various formats."""
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class HistoricalResponse(BaseModel):
"""Response from historical data endpoint."""
model_config = ConfigDict(populate_by_name=True)
success: bool = Field(default=True)
data: List[HistoricalPrice]
meta: Optional['PaginationMeta'] = None
class PaginationMeta(BaseModel):
"""Pagination metadata."""
model_config = ConfigDict(populate_by_name=True)
page: int = Field(description="Current page number")
per_page: int = Field(description="Items per page")
total: int = Field(description="Total number of items")
total_pages: int = Field(description="Total number of pages")
has_next: bool = Field(description="Has next page")
has_prev: bool = Field(description="Has previous page")
# Update forward reference
HistoricalResponse.model_rebuild()
class Commodity(BaseModel):
"""Commodity information."""
model_config = ConfigDict(populate_by_name=True)
code: str = Field(description="Commodity code")
name: str = Field(description="Commodity name")
category: str = Field(description="Category (oil, gas, refined)")
unit: str = Field(description="Unit of measurement")
currency: str = Field(description="Default currency")
description: Optional[str] = Field(default=None)
class CommodityListResponse(BaseModel):
"""Response with available commodities."""
model_config = ConfigDict(populate_by_name=True)
success: bool = Field(default=True)
data: List[Commodity]
count: int
class ApiStatus(BaseModel):
"""API status information."""
model_config = ConfigDict(populate_by_name=True)
status: str = Field(description="API status (operational, degraded, down)")
version: str = Field(description="API version")
timestamp: datetime
uptime: Optional[float] = Field(default=None, description="Uptime percentage")
response_time: Optional[float] = Field(default=None, description="Average response time (ms)")
class UsageStats(BaseModel):
"""API usage statistics."""
model_config = ConfigDict(populate_by_name=True)
requests_today: int = Field(description="Requests made today")
requests_this_month: int = Field(description="Requests this month")
limit_daily: Optional[int] = Field(default=None, description="Daily request limit")
limit_monthly: int = Field(description="Monthly request limit")
remaining_today: Optional[int] = Field(default=None)
remaining_this_month: int
reset_at: datetime = Field(description="When limits reset")
plan: str = Field(description="Current plan name")
class DieselPrice(BaseModel):
"""State average diesel price data."""
model_config = ConfigDict(populate_by_name=True)
state: str = Field(description="Two-letter US state code (e.g., CA, TX)")
price: float = Field(description="Average diesel price in USD per gallon")
currency: str = Field(description="Currency code (always USD)")
unit: str = Field(description="Unit of measurement (always gallon)")
granularity: str = Field(description="Data granularity level (e.g., state, national)")
source: str = Field(description="Data source (e.g., EIA)")
updated_at: datetime = Field(description="Last update timestamp")
cached: Optional[bool] = Field(default=None, description="Whether served from cache")
@field_validator('updated_at', mode='before')
@classmethod
def parse_updated_at(cls, v):
"""Parse updated_at from various formats."""
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class DieselStationLocation(BaseModel):
"""Geographic coordinates for a diesel station."""
model_config = ConfigDict(populate_by_name=True)
lat: float = Field(description="Latitude")
lng: float = Field(description="Longitude")
class DieselStation(BaseModel):
"""Diesel station with pricing information."""
model_config = ConfigDict(populate_by_name=True)
name: str = Field(description="Station name")
address: str = Field(description="Full street address")
location: DieselStationLocation = Field(description="Geographic coordinates")
diesel_price: float = Field(description="Diesel price at this station (USD/gallon)")
formatted_price: str = Field(description="Formatted price string (e.g., $3.89)")
currency: str = Field(description="Currency code (always USD)")
unit: str = Field(description="Unit of measurement (always gallon)")
price_delta: Optional[float] = Field(default=None, description="Price difference from regional average")
price_vs_average: Optional[str] = Field(default=None, description="Human-readable price comparison")
fuel_types: Optional[List[str]] = Field(default=None, description="Available fuel types")
last_updated: Optional[datetime] = Field(default=None, description="Last price update timestamp")
@field_validator('last_updated', mode='before')
@classmethod
def parse_last_updated(cls, v):
"""Parse last_updated from various formats."""
if v is None:
return None
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class DieselRegionalAverage(BaseModel):
"""Regional average diesel price for comparison."""
model_config = ConfigDict(populate_by_name=True)
price: float = Field(description="Regional average price")
currency: str = Field(description="Currency code")
unit: str = Field(description="Unit of measurement")
region: str = Field(description="Region name")
granularity: str = Field(description="Granularity level")
source: str = Field(description="Data source")
class DieselSearchArea(BaseModel):
"""Search area details for station query."""
model_config = ConfigDict(populate_by_name=True)
center: DieselStationLocation = Field(description="Search center coordinates")
radius_meters: float = Field(description="Search radius in meters")
radius_miles: float = Field(description="Search radius in miles")
class DieselStationsMetadata(BaseModel):
"""Metadata about diesel stations query."""
model_config = ConfigDict(populate_by_name=True)
total_stations: int = Field(description="Number of stations found")
source: str = Field(description="Data source")
cached: bool = Field(description="Whether served from cache")
api_cost: float = Field(description="API query cost in dollars")
timestamp: datetime = Field(description="Response timestamp")
cache_age_hours: Optional[float] = Field(default=None, description="Cache age in hours")
@field_validator('timestamp', mode='before')
@classmethod
def parse_timestamp(cls, v):
"""Parse timestamp from various formats."""
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class DieselStationsResponse(BaseModel):
"""Response from diesel stations endpoint."""
model_config = ConfigDict(populate_by_name=True)
regional_average: DieselRegionalAverage = Field(description="Regional average for comparison")
stations: List[DieselStation] = Field(description="List of nearby stations")
search_area: DieselSearchArea = Field(description="Search area details")
metadata: DieselStationsMetadata = Field(description="Query metadata")
class PriceAlert(BaseModel):
"""Price alert configuration and status."""
model_config = ConfigDict(populate_by_name=True)
id: str = Field(description="Unique alert identifier")
name: str = Field(description="User-friendly alert name")
commodity_code: str = Field(description="Commodity code to monitor (e.g., BRENT_CRUDE_USD)")
condition_operator: str = Field(description="Comparison operator (greater_than, less_than, equals, etc.)")
condition_value: float = Field(description="Price threshold value in USD")
webhook_url: Optional[str] = Field(default=None, description="Optional webhook URL for notifications")
enabled: bool = Field(description="Whether the alert is active")
cooldown_minutes: int = Field(description="Minimum minutes between alert triggers (0-1440)")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Optional custom metadata")
trigger_count: int = Field(description="Number of times this alert has triggered")
last_triggered_at: Optional[datetime] = Field(default=None, description="Last trigger timestamp")
created_at: datetime = Field(description="Alert creation timestamp")
updated_at: datetime = Field(description="Last update timestamp")
@field_validator('last_triggered_at', 'created_at', 'updated_at', mode='before')
@classmethod
def parse_datetime(cls, v):
"""Parse datetime from various formats."""
if v is None:
return None
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class WebhookTestResponse(BaseModel):
"""Response from webhook test endpoint."""
model_config = ConfigDict(populate_by_name=True)
success: bool = Field(description="Test result status")
status_code: int = Field(description="HTTP status code from webhook endpoint")
response_time_ms: float = Field(description="Response time in milliseconds")
response_body: Optional[str] = Field(default=None, description="Response body from webhook")
error: Optional[str] = Field(default=None, description="Error message if test failed")
class MarketBriefForecast(BaseModel):
"""1-month forecast block attached to a market-brief commodity."""
model_config = ConfigDict(populate_by_name=True)
point: Optional[float] = Field(default=None, description="Central forecast value")
low: Optional[float] = Field(default=None, description="Lower bound of the forecast range")
high: Optional[float] = Field(default=None, description="Upper bound of the forecast range")
# The API historically sent a label ("high"/"medium"/"low") but now sends a
# numeric score (e.g. 0.65) — accept both. Caught by the live contract
# tests on 2026-07-03, first run with a real key.
confidence: Optional[Union[str, float]] = Field(
default=None, description="Confidence label (e.g. high) or numeric score (e.g. 0.65)"
)
class MarketBriefCommodity(BaseModel):
"""A single commodity entry within a market brief."""
model_config = ConfigDict(populate_by_name=True, extra="allow")
code: str = Field(description="Canonical commodity code (e.g. BRENT_CRUDE_USD)")
name: Optional[str] = Field(default=None, description="Human-readable commodity name")
price: Optional[float] = Field(default=None, description="Latest price value")
currency: Optional[str] = Field(default=None, description="Currency code")
change_24h_pct: Optional[float] = Field(default=None, description="24-hour percentage change")
forecast_1m: Optional[MarketBriefForecast] = Field(default=None, description="1-month forecast block")
stale: Optional[bool] = Field(default=None, description="Whether the price is considered stale")
class MarketBrief(BaseModel):
"""Multi-commodity structured (+ optional narrative) market summary.
Returned by ``client.market_brief(...)`` / ``await client.market_brief(...)``.
"""
model_config = ConfigDict(populate_by_name=True, extra="allow")
as_of: Optional[datetime] = Field(default=None, description="Timestamp the brief was composed")
codes: List[str] = Field(default_factory=list, description="Resolved commodity codes in the brief")
commodities: List[MarketBriefCommodity] = Field(
default_factory=list, description="Per-commodity structured data"
)
narrative: Optional[str] = Field(default=None, description="Optional natural-language narrative")
@field_validator("as_of", mode="before")
@classmethod
def parse_as_of(cls, v):
"""Parse as_of timestamp from various formats."""
if v is None:
return None
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace("Z", "+00:00"))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class Subscription(BaseModel):
"""An agent subscription ("watch") that periodically evaluates commodities."""
model_config = ConfigDict(populate_by_name=True, extra="allow")
id: str = Field(description="Unique subscription identifier")
name: Optional[str] = Field(default=None, description="User-friendly subscription name")
codes: List[str] = Field(default_factory=list, description="Commodity codes being watched")
interval_seconds: Optional[int] = Field(default=None, description="Evaluation interval in seconds")
status: Optional[str] = Field(default=None, description="Subscription status (active, paused, etc.)")
deliver_webhook: Optional[bool] = Field(default=None, description="Whether events are delivered via webhook")
source: Optional[str] = Field(default=None, description="Attribution source (sdk-python, mcp, api, etc.)")
tool_name: Optional[str] = Field(default=None, description="Attribution tool name")
last_evaluated_at: Optional[datetime] = Field(default=None, description="Last evaluation timestamp")
next_run_at: Optional[datetime] = Field(default=None, description="Next scheduled evaluation timestamp")
created_at: Optional[datetime] = Field(default=None, description="Creation timestamp")
@field_validator("last_evaluated_at", "next_run_at", "created_at", mode="before")
@classmethod
def parse_datetimes(cls, v):
"""Parse datetime fields from various formats."""
if v is None:
return None
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace("Z", "+00:00"))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class SubscriptionEvent(BaseModel):
"""A single event emitted by a subscription, returned from the poll endpoint."""
model_config = ConfigDict(populate_by_name=True, extra="allow")
seq: Optional[int] = Field(default=None, description="Monotonic per-user sequence cursor")
watch_id: Optional[str] = Field(default=None, description="Subscription (watch) that produced the event")
type: Optional[str] = Field(default=None, description="Event type")
code: Optional[str] = Field(default=None, description="Commodity code the event relates to")
payload: Optional[Dict[str, Any]] = Field(default=None, description="Event payload")
created_at: Optional[datetime] = Field(default=None, description="Event timestamp")
@field_validator("created_at", mode="before")
@classmethod
def parse_created_at(cls, v):
"""Parse created_at from various formats."""
if v is None:
return None
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace("Z", "+00:00"))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
class DataConnectorPrice(BaseModel):
"""Price from connected data source (BYOS - Bring Your Own Subscription)."""
model_config = ConfigDict(populate_by_name=True)
price: float = Field(description="Price value in specified currency")
currency: str = Field(description="Currency code (e.g., USD)")
fuel_type: str = Field(description="Fuel type (VLSFO, MGO, IFO380)")
port: str = Field(description="Port name")
region: Optional[str] = Field(default=None, description="Geographic region (AMERICAS, EMEA, APAC)")
unit: str = Field(description="Unit of measure (MT = metric ton)")
source: str = Field(description="Data provider (e.g., shipandbunker)")
timestamp: datetime = Field(description="ISO 8601 timestamp when price was recorded")
@field_validator('timestamp', mode='before')
@classmethod
def parse_timestamp(cls, v):
"""Parse timestamp from various formats."""
if isinstance(v, str):
try:
return datetime.fromisoformat(v.replace('Z', '+00:00'))
except ValueError:
from dateutil import parser
return parser.parse(v)
return v
def __str__(self) -> str:
"""String representation."""
return f"{self.fuel_type} @ {self.port}: {self.currency}{self.price:.2f}/{self.unit}"