This repository was archived by the owner on May 8, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathworkflows.py
More file actions
130 lines (106 loc) · 4.42 KB
/
workflows.py
File metadata and controls
130 lines (106 loc) · 4.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""Composite execution workflows wiring risk and compliance controls."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable
from .compliance import ComplianceMonitor, ComplianceReport, ComplianceViolation
from .risk import LimitViolation, OrderRateExceeded, RiskManager
@dataclass(slots=True, frozen=True)
class OrderRequest:
"""Lightweight representation of an order for workflow evaluation."""
symbol: str
side: str
quantity: float
price: float
@dataclass(slots=True, frozen=True)
class OrderAssessment:
"""Outcome of running risk and compliance against a single order."""
request: OrderRequest
compliance_report: ComplianceReport
risk_error: str | None = None
@property
def passed(self) -> bool:
return not self.compliance_report.blocked and self.risk_error is None
@dataclass(slots=True, frozen=True)
class WorkflowAssessment:
"""Collection of order outcomes generated by :class:`RiskComplianceWorkflow`."""
accepted: tuple[OrderAssessment, ...]
rejected: tuple[OrderAssessment, ...]
@property
def compliance_reports(self) -> tuple[ComplianceReport, ...]:
return tuple(
assessment.compliance_report
for assessment in (*self.accepted, *self.rejected)
)
@property
def passed(self) -> bool:
return not self.rejected
class RiskComplianceWorkflow:
"""Run compliance checks followed by risk validation for order batches."""
def __init__(
self, risk_manager: RiskManager, compliance_monitor: ComplianceMonitor
) -> None:
self._risk = risk_manager
self._compliance = compliance_monitor
def evaluate(self, orders: Iterable[OrderRequest]) -> WorkflowAssessment:
"""Evaluate ``orders`` returning structured outcomes."""
accepted: list[OrderAssessment] = []
rejected: list[OrderAssessment] = []
for order in orders:
try:
report = self._compliance.check(
order.symbol, order.quantity, order.price
)
except ComplianceViolation as exc:
report = exc.report
if report is None:
report = ComplianceReport(
symbol=order.symbol,
requested_quantity=order.quantity,
requested_price=order.price,
normalized_quantity=order.quantity,
normalized_price=order.price,
violations=(str(exc),),
blocked=True,
)
rejected.append(OrderAssessment(order, report))
continue
assessment = OrderAssessment(request=order, compliance_report=report)
if report.blocked:
rejected.append(assessment)
continue
try:
self._risk.validate_order(
order.symbol, order.side, order.quantity, order.price
)
except (LimitViolation, OrderRateExceeded) as exc:
rejected.append(OrderAssessment(order, report, str(exc)))
continue
except ComplianceViolation as exc: # pragma: no cover - defensive guard
rejected.append(OrderAssessment(order, report, str(exc)))
continue
accepted.append(assessment)
# Assume immediate fill to keep exposure tracking consistent for subsequent orders.
self._risk.register_fill(
order.symbol, order.side, order.quantity, order.price
)
return WorkflowAssessment(tuple(accepted), tuple(rejected))
def evaluate_from_dicts(
self, payloads: Iterable[dict[str, object]]
) -> WorkflowAssessment:
"""Helper accepting dictionaries (for fixtures and JSON payloads)."""
orders: list[OrderRequest] = []
for payload in payloads:
symbol = str(payload.get("symbol"))
side = str(payload.get("side", "buy")).lower()
quantity = float(payload.get("quantity", 0.0))
price = float(payload.get("price", 0.0))
orders.append(
OrderRequest(symbol=symbol, side=side, quantity=quantity, price=price)
)
return self.evaluate(orders)
__all__ = [
"OrderRequest",
"OrderAssessment",
"WorkflowAssessment",
"RiskComplianceWorkflow",
]