Skip to content

Commit f65ec6e

Browse files
Resolve potential data cut off in training data consolidation
1 parent efcbec6 commit f65ec6e

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

src/service_ml_forecast/clients/openremote/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class AssetDatapoint(BaseModel):
5959
"""Data point of an asset attribute.
6060
6161
Args:
62-
x: The timestamp of the data point.
62+
x: The timestamp of the data point in milliseconds.
6363
y: The value of the data point.
6464
"""
6565

src/service_ml_forecast/services/model_scheduler.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#
1616
# SPDX-License-Identifier: AGPL-3.0-or-later
1717

18+
import datetime
1819
import logging
1920
import time
2021

@@ -203,10 +204,16 @@ def _model_training_job(config: ModelConfig, data_service: OpenRemoteService) ->
203204
# Save the model
204205
provider.save_model(model)
205206

207+
# Log the first and last datapoint datetimes of the target attribute
208+
target_first_datapoint_datetime = datetime.datetime.fromtimestamp(training_dataset.target.datapoints[0].x / 1000)
209+
target_last_datapoint_datetime = datetime.datetime.fromtimestamp(training_dataset.target.datapoints[-1].x / 1000)
210+
206211
end_time = time.perf_counter()
207212
logger.info(
208213
f"Training job for {config.id} completed - duration: {end_time - start_time}s. "
209-
f"Type: {config.type}, Training Interval: {config.training_interval}"
214+
f"Type: {config.type}, Training Interval: {config.training_interval}, "
215+
f"Target first datapoint datetime: {target_first_datapoint_datetime}, "
216+
f"Target last datapoint datetime: {target_last_datapoint_datetime}"
210217
)
211218

212219

@@ -248,8 +255,14 @@ def _model_forecast_job(config: ModelConfig, data_service: OpenRemoteService) ->
248255
return
249256

250257
end_time = time.perf_counter()
258+
259+
# Log the first and last datapoint datetimes of the forecast
260+
first_datapoint_datetime = datetime.datetime.fromtimestamp(forecast.datapoints[0].x / 1000)
261+
last_datapoint_datetime = datetime.datetime.fromtimestamp(forecast.datapoints[-1].x / 1000)
262+
251263
logger.info(
252264
f"Forecasting job for {config.id} completed - duration: {end_time - start_time}s. "
253265
f"Wrote {len(forecast.datapoints)} datapoints. "
266+
f"First datapoint datetime: {first_datapoint_datetime}, Last datapoint datetime: {last_datapoint_datetime}"
254267
f"Asset ID: {config.target.asset_id}, Attribute: {config.target.attribute_name}"
255268
)

src/service_ml_forecast/services/openremote_service.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,9 @@ def __get_historical_datapoints(
152152
logger.info(
153153
f"Chunking datapoint retrieval into {months_diff} monthly chunks for {asset_id} {attribute_name}"
154154
)
155-
for i in range(months_diff):
155+
156+
# Continue until we've processed a chunk that ends at or after to_timestamp
157+
while current_from < to_timestamp:
156158
# Calculate the end timestamp for this chunk (1 month from current_from)
157159
current_to = TimeUtil.add_months_to_timestamp(current_from, 1)
158160

@@ -166,7 +168,7 @@ def __get_historical_datapoints(
166168
if chunk_datapoints is None:
167169
logger.error(
168170
f"Failed to retrieve historical datapoints for {asset_id} {attribute_name} "
169-
f"for chunk {i + 1} of {months_diff}"
171+
f"for chunk ending at {current_to}"
170172
)
171173
return None
172174

@@ -175,10 +177,6 @@ def __get_historical_datapoints(
175177
# Move to the next chunk
176178
current_from = current_to
177179

178-
# Break if we've reached the end
179-
if current_from >= to_timestamp:
180-
break
181-
182180
return all_datapoints
183181

184182
def get_forecast_dataset(self, config: ModelConfig) -> ForecastDataSet | None:

0 commit comments

Comments
 (0)