Skip to content

Commit 07bf8cd

Browse files
Merge pull request #44 from splightplatform/mgaragiola/hypersim-reader
Mgaragiola/hypersim reader
2 parents 6e663e6 + 828c0ee commit 07bf8cd

File tree

12 files changed

+1712
-259
lines changed

12 files changed

+1712
-259
lines changed

simulation/hypersim/__init__.py

Whitespace-only changes.
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from datetime import datetime, timezone
2+
from logging import getLogger
3+
4+
from hypersim.interfaces import DataReader, DataSaver
5+
6+
logger = getLogger("HypersimOperator")
7+
8+
9+
class HypersimConnector:
10+
def __init__(self, reader: DataReader):
11+
self._reader = reader
12+
self._data_savers: list[DataSaver] = []
13+
14+
def add_data_saver(
15+
self,
16+
saver: DataSaver,
17+
) -> None:
18+
self._data_savers.append(saver)
19+
20+
def process(self) -> None:
21+
logger.debug("Reading data from Hypersim")
22+
now = datetime.now(timezone.utc)
23+
data = self._reader.read()
24+
for saver in self._data_savers:
25+
saver.process_data(data, now)

simulation/hypersim/data_saver.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from datetime import datetime
2+
from logging import getLogger
3+
4+
from splight_lib.models import Asset
5+
6+
# TODO: Update to use v4 models
7+
from splight_lib.models._v3.native import Number
8+
9+
logger = getLogger("HypersimOperator")
10+
11+
12+
class DeviceDataSaver:
13+
def __init__(self, asset: Asset):
14+
self._asset = asset
15+
self._attributes = {item.name: item for item in asset.attributes}
16+
self._attr_sensor_map: dict[str, str] = {}
17+
18+
def add_attribute(self, attribute: str, sensor: str) -> None:
19+
if attribute not in self._attributes:
20+
raise ValueError(f"Attribute {attribute} not found in asset.")
21+
if attribute in self._attr_sensor_map:
22+
logger.debug(f"Attribute {attribute} already added.")
23+
self._attr_sensor_map[sensor] = self._attributes[attribute]
24+
25+
def process_data(
26+
self, data: dict[str, float], date: datetime
27+
) -> dict[str, float]:
28+
for sensor, attribute in self._attr_sensor_map.items():
29+
sensor_value = data.get(sensor, None)
30+
if sensor_value is None:
31+
logger.debug(f"Sensor {sensor} not found in data.")
32+
continue
33+
logger.debug(
34+
f"Saving data for sensor {sensor} with value {sensor_value}"
35+
)
36+
attr_value = Number(
37+
timestamp=date,
38+
asset=self._asset.id,
39+
attribute=attribute.id,
40+
value=sensor_value,
41+
)
42+
attr_value.save()

simulation/hypersim/interfaces.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from typing import Protocol
2+
3+
4+
class DataReader(Protocol):
5+
def read(self) -> list[float]:
6+
pass
7+
8+
9+
class DataSaver(Protocol):
10+
def save(self, data: list[float]) -> None:
11+
pass

simulation/hypersim/operator.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
from datetime import datetime, timezone
2+
from logging import getLogger
3+
from time import time
4+
from typing import Optional, Tuple, TypedDict
5+
6+
import HyWorksApiGRPC as HyWorksApi
7+
import requests
8+
from splight_lib.models import Asset
9+
from splight_lib.settings import workspace_settings
10+
11+
from hypersim.data_saver import DeviceDataSaver
12+
from hypersim.interfaces import DataReader
13+
from hypersim.reader import AssetAttributeDataReader
14+
15+
logger = getLogger("HypersimOperator")
16+
GENERATOR_VECTOR_NAME = "generators_vector"
17+
LINE_CONTINGENCY = "contingency"
18+
19+
20+
class LineInfo(TypedDict):
21+
name: str
22+
asset: str
23+
breaker: str
24+
25+
26+
class GeneratorInfo(TypedDict):
27+
name: str
28+
asset: str
29+
breaker: str
30+
31+
32+
def set_device_value(
33+
device: str, variable: str, value: str | int | float
34+
) -> None:
35+
"""Sets the value of a device variable in Hypersim."""
36+
HyWorksApi.setComponentParameter(device, variable, str(value))
37+
logger.debug(f"Setting {device}.{variable} to {value}")
38+
39+
40+
class DCMHypersimOperator:
41+
def __init__(
42+
self,
43+
grid: str,
44+
lines: list[LineInfo],
45+
generators: list[GeneratorInfo],
46+
hypersim_reader: DataReader,
47+
):
48+
self._grid = grid
49+
addresses = []
50+
breakers = []
51+
self._savers = []
52+
for line_info in lines:
53+
line = Asset.retrieve(line_info["asset"])
54+
attrs = {attr.name: attr for attr in line.attributes}
55+
address = {
56+
"asset": line.id,
57+
"attribute": attrs[GENERATOR_VECTOR_NAME].id,
58+
}
59+
addresses.append(address)
60+
breakers.append(line_info["breaker"])
61+
data_saver = DeviceDataSaver(line)
62+
data_saver.add_attribute(
63+
LINE_CONTINGENCY, line_info["breaker"]
64+
)
65+
self._savers.append(data_saver)
66+
self._lines = lines
67+
self._lines_breakers = breakers
68+
self._generators = {item["name"]: item for item in generators}
69+
self._generators_vector: dict[str, list[int]] = {}
70+
self._hy_reader = hypersim_reader
71+
self._spl_reader = AssetAttributeDataReader(
72+
addresses, data_type="String", limit=1
73+
)
74+
75+
self._in_contingency = False
76+
self._last_contingency: datetime | None = None
77+
78+
def run(self) -> None:
79+
t0 = time()
80+
contingency = self._check_for_contingency()
81+
new_contingency = self._process_contingency(contingency)
82+
t1 = time()
83+
if new_contingency:
84+
logger.info(
85+
f"\n\n\nOperation time: {(t1 - t0) * 1000:.3f} ms\n\n\n"
86+
)
87+
88+
def _process_contingency(
89+
self, contingency: Optional[Tuple[str, int]]
90+
) -> bool:
91+
new_contingency = False
92+
if self._in_contingency:
93+
if contingency is None:
94+
logger.info("Recovering system from contingency")
95+
self._recover_system()
96+
self._in_contingency = False
97+
else:
98+
logger.info(
99+
"System still in contingency. Waiting for recovery"
100+
)
101+
else:
102+
if contingency is not None:
103+
logger.info(f"Contingency found on line {contingency[0]}")
104+
self._run_operation(contingency[0])
105+
self._set_contingency(contingency[0])
106+
new_contingency = True
107+
self._in_contingency = True
108+
self._last_contingency = datetime.now(timezone.utc)
109+
else:
110+
logger.debug("No contingency found.")
111+
return new_contingency
112+
113+
def _set_contingency(self, line_breaker: str) -> None:
114+
date = datetime.now(timezone.utc)
115+
for saver in self._savers:
116+
saver.process_data({line_breaker: True}, date)
117+
118+
def _check_for_contingency(self) -> Optional[Tuple[str, int]]:
119+
breakers_status = self._hy_reader.read()
120+
line_in_contingency = next(
121+
filter(
122+
lambda x: x[1] == 0 and x[0] in self._lines_breakers,
123+
breakers_status.items(),
124+
),
125+
None,
126+
)
127+
return line_in_contingency
128+
129+
def update_operation_vectors(self) -> None:
130+
data = self._spl_reader.read()
131+
for line_id, vector in data.items():
132+
line_name = next(
133+
filter(lambda x: x["asset"] == line_id, self._lines)
134+
)
135+
self._generators_vector.update(
136+
{line_name["breaker"]: self._parse_generator_vector(vector)}
137+
)
138+
logger.info(f"Operation vectors updated: {self._generators_vector}")
139+
140+
def _run_operation(self, line_breaker: str) -> None:
141+
vector = self._generators_vector.get(line_breaker, None)
142+
if vector is None:
143+
line_name = next(
144+
filter(lambda x: x["breaker"] == line_breaker, self._lines)
145+
)["name"]
146+
raise ValueError(f"No operation vector found for line {line_name}")
147+
self._apply_vector(vector)
148+
149+
def _apply_vector(self, vector: dict[str, int]) -> None:
150+
logger.info(f"Applying operation vector: {vector}")
151+
for gen_name, value in vector.items():
152+
if value == 0:
153+
continue
154+
# In Hypersim, the setpoint is 0 to open the breaker and 7 to
155+
# close it
156+
setpoint = 0 if value == 1 else 7
157+
generator = self._generators.get(gen_name, None)
158+
# TODO: Check if generator is None
159+
block, variable = generator["breaker"].split(".")
160+
set_device_value(block, variable, setpoint)
161+
logger.debug(f"Setting generator {gen_name} to {setpoint}")
162+
163+
def _parse_generator_vector(self, vector: str) -> list[int]:
164+
generator_ordering = self._fetch_gen_ordering(self._grid)
165+
splitted_vector = [int(x) for x in vector.split(",")]
166+
sorted_gens = []
167+
for gen in generator_ordering:
168+
gen_id = gen["id"]
169+
full_gen = next(
170+
filter(
171+
lambda x: x["asset"] == gen_id, self._generators.values()
172+
)
173+
)
174+
sorted_gens.append(full_gen)
175+
parsed_vector = {
176+
gen["name"]: value
177+
for gen, value in zip(sorted_gens, splitted_vector)
178+
}
179+
return parsed_vector
180+
181+
def _recover_system(self) -> None:
182+
date = datetime.now(timezone.utc)
183+
for generator in self._generators.values():
184+
block, variable = generator["breaker"].split(".")
185+
set_device_value(block, variable, 7)
186+
logger.debug(f"Closing breaker for generator {generator['name']}")
187+
188+
for saver in self._savers:
189+
saver.process_data(
190+
{breaker: False for breaker in self._lines_breakers}, date
191+
)
192+
193+
@staticmethod
194+
def _fetch_gen_ordering(grid_id: str) -> list[str]:
195+
host = workspace_settings.SPLIGHT_PLATFORM_API_HOST
196+
url = f"{host}/v4/engine/asset/grids/{grid_id}/structure/"
197+
access_id = workspace_settings.SPLIGHT_ACCESS_ID
198+
secret_key = workspace_settings.SPLIGHT_SECRET_KEY
199+
header = {
200+
"Authorization": f"Splight {access_id} {secret_key}",
201+
}
202+
response = requests.get(url, headers=header)
203+
response.raise_for_status()
204+
data = response.json()
205+
return data["generators"]

simulation/hypersim/reader.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
from datetime import datetime, timedelta, timezone
2+
from logging import getLogger
3+
from typing import TypedDict
4+
5+
import HyWorksApiGRPC as HyWorksApi
6+
from splight_lib.models._v3.datalake import DataRequest, PipelineStep, Trace
7+
from splight_lib.models._v3.native import Boolean, Number, String
8+
from tenacity import retry, wait_fixed
9+
10+
logger = getLogger("HypersimOperator")
11+
12+
TYPE_MAP = {
13+
"number": Number,
14+
"string": String,
15+
"boolean": Boolean,
16+
}
17+
18+
19+
class DataAddress(TypedDict):
20+
asset: str
21+
attribute: str
22+
23+
24+
class HypersimDataReader:
25+
def __init__(self, sensors: list[str] | None = None):
26+
self._connect()
27+
self._sensors: set[str] = set(sensors) if sensors else set()
28+
self._data: dict[str, float] = {}
29+
30+
def add_sensor(self, sensor: str) -> None:
31+
if sensor in self._sensors:
32+
logger.debug(f"Sensor {sensor} already added.")
33+
self._sensors.add(sensor)
34+
35+
def update_data(self) -> None:
36+
try:
37+
values = self._read_sensor_values()
38+
except Exception as e:
39+
logger.error(f"Error reading sensors: {e}")
40+
self._connect()
41+
raise e
42+
if len(values) != len(self._sensors):
43+
raise ValueError(
44+
(
45+
"An error occurred while reading sensors. The number of "
46+
"read values does not match the number of sensors."
47+
)
48+
)
49+
self._data = {key: value for key, value in zip(self._sensors, values)}
50+
51+
def read(self) -> dict[str, float]:
52+
return self._data
53+
54+
@retry(wait=wait_fixed(0.1))
55+
def _read_sensor_values(self) -> list[float]:
56+
values = HyWorksApi.getLastSensorValues(list(self._sensors))
57+
return values
58+
59+
def _connect(self) -> None:
60+
HyWorksApi.startAndConnectHypersim()
61+
return None
62+
63+
64+
class AssetAttributeDataReader:
65+
def __init__(
66+
self,
67+
addresses: list[DataAddress],
68+
data_type: str = "Number",
69+
limit: int = 1000,
70+
):
71+
self._addresses = addresses
72+
self._limit = limit
73+
self._data_class = TYPE_MAP.get(data_type.lower(), Number)
74+
75+
def add_address(self, address: DataAddress) -> None:
76+
# TODO: validate if address already exists
77+
self._addresses.append(address)
78+
79+
def read(self):
80+
now = datetime.now(timezone.utc)
81+
from_ts = now - timedelta(minutes=30)
82+
request = DataRequest[self._data_class](
83+
from_timestamp=from_ts,
84+
to_timestamp=now,
85+
)
86+
for address in self._addresses:
87+
trace = Trace.from_address(address["asset"], address["attribute"])
88+
trace.add_step(PipelineStep.from_dict({"$limit": self._limit}))
89+
request.add_trace(trace)
90+
data = request.apply()
91+
if len(data) != len(self._addresses):
92+
raise ValueError(
93+
(
94+
"An error occurred while reading attributes. "
95+
"The number of read values does not match the "
96+
"number of addresses."
97+
)
98+
)
99+
return {item.asset: item.value for item in data}

0 commit comments

Comments
 (0)