Skip to content

Commit dfd88ed

Browse files
author
Hendrik Muhs
authored
[ML] Feature/forecast scale (#89)
This implements the C++ side of forecast persistence. An additional parameter allows the forecast runner to persist models on disk for temporary purposes. Models are loaded back into memory one by one. For models smaller than the current limit of 20MB nothing changes.
1 parent d219bea commit dfd88ed

28 files changed

+699
-34
lines changed

bin/autodetect/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ INSTALL_DIR=$(CPP_PLATFORM_HOME)/bin
1212
ML_LIBS=$(LIB_ML_CORE) $(LIB_ML_MATHS) $(LIB_ML_MODEL) $(LIB_ML_API)
1313

1414
USE_BOOST=1
15+
USE_BOOST_FILESYSTEM_LIBS=1
1516
USE_BOOST_PROGRAMOPTIONS_LIBS=1
1617
USE_RAPIDJSON=1
1718
USE_EIGEN=1

docs/CHANGELOG.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ Improve and use periodic boundary condition for seasonal component modeling ({pu
3131
Improve robustness w.r.t. outliers of detection and initialisation of seasonal components ({pull}90[#90])
3232
Explicit change point detection and modelling ({pull}92[#92])
3333

34+
Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
35+
model state on disk ({pull}89[#89])
36+
3437
=== Bug Fixes
3538

3639
Age seasonal components in proportion to the fraction of values with which they're updated ({pull}88[#88])

include/api/CForecastRunner.h

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <model/CForecastDataSink.h>
2222
#include <model/CResourceMonitor.h>
2323

24+
#include <boost/filesystem.hpp>
2425
#include <boost/unordered_set.hpp>
2526

2627
#include <condition_variable>
@@ -68,10 +69,22 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
6869
static const size_t DEFAULT_EXPIRY_TIME = 14 * core::constants::DAY;
6970

7071
//! max memory allowed to use for forecast models
71-
static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520; // 20MB
72+
static const size_t MAX_FORECAST_MODEL_MEMORY = 20971520ull; // 20MB
73+
74+
//! Note: This value measures the size in memory, not the size of the persistence,
75+
//! which is likely higher and would be hard to calculate upfront
76+
//! max memory allowed to use for forecast models persisting to disk
77+
static const size_t MAX_FORECAST_MODEL_PERSISTANCE_MEMORY = 524288000ull; // 500MB
78+
79+
//! Note: This value is lower than on X-pack side to prevent side-effects,
80+
//! if you change this value also change the limit on X-pack side.
81+
//! The purpose of this value is to guard the rest of the system regarding
82+
//! an out of disk space
83+
//! minimum disk space required for disk persistence
84+
static const size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB
7285

7386
//! minimum time between stat updates to prevent to many updates in a short time
74-
static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000; // 3s
87+
static const uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s
7588

7689
private:
7790
static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE;
@@ -82,6 +95,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
8295
static const std::string ERROR_NO_CREATE_TIME;
8396
static const std::string ERROR_BAD_MEMORY_STATUS;
8497
static const std::string ERROR_MEMORY_LIMIT;
98+
static const std::string ERROR_MEMORY_LIMIT_DISK;
99+
static const std::string ERROR_MEMORY_LIMIT_DISKSPACE;
85100
static const std::string ERROR_NOT_SUPPORTED_FOR_POPULATION_MODELS;
86101
static const std::string ERROR_NO_SUPPORTED_FUNCTIONS;
87102
static const std::string WARNING_DURATION_LIMIT;
@@ -100,6 +115,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
100115
using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;
101116
using TForecastResultSeries = model::CForecastDataSink::SForecastResultSeries;
102117
using TForecastResultSeriesVec = std::vector<TForecastResultSeries>;
118+
using TMathsModelPtr = std::shared_ptr<maths::CModel>;
103119

104120
using TStrUSet = boost::unordered_set<std::string>;
105121

@@ -189,6 +205,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
189205

190206
//! A collection storing important messages from forecasting
191207
TStrUSet s_Messages;
208+
209+
//! A directory to persist models on disk
210+
std::string s_TemporaryFolder;
192211
};
193212

194213
private:
@@ -202,6 +221,9 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
202221
//! Check for new jobs, blocks while waiting
203222
bool tryGetJob(SForecast& forecastJob);
204223

224+
//! check for sufficient disk space
225+
bool sufficientAvailableDiskSpace(const boost::filesystem::path& path);
226+
205227
//! pushes new jobs into the internal 'queue' (thread boundary)
206228
bool push(SForecast& forecastJob);
207229

include/core/RestoreMacros.h

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,23 @@ namespace core {
3939
continue; \
4040
}
4141

42+
#define RESTORE_ENUM(tag, target, enumtype) \
43+
if (name == tag) { \
44+
int value; \
45+
if (core::CStringUtils::stringToType(traverser.value(), value) == false) { \
46+
LOG_ERROR(<< "Failed to restore " #tag ", got " << traverser.value()); \
47+
return false; \
48+
} \
49+
target = enumtype(value); \
50+
continue; \
51+
}
52+
53+
#define RESTORE_ENUM_CHECKED(tag, target, enumtype, restoreSuccess) \
54+
if (name == tag) { \
55+
restoreSuccess = true; \
56+
RESTORE_ENUM(tag, target, enumtype) \
57+
}
58+
4259
#define RESTORE_SETUP_TEARDOWN(tag, setup, restore, teardown) \
4360
if (name == tag) { \
4461
setup; \

include/model/CAnomalyDetector.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
234234
CForecastDataSink::SForecastModelPrerequisites getForecastPrerequisites() const;
235235

236236
//! Generate maths models for forecasting
237-
CForecastDataSink::SForecastResultSeries getForecastModels() const;
237+
CForecastDataSink::SForecastResultSeries
238+
getForecastModels(bool persistOnDisk = false,
239+
const std::string& persistenceFolder = EMPTY_STRING) const;
238240

239241
//! Remove dead models, i.e. those models that have more-or-less
240242
//! reverted back to their non-informative state. BE CAREFUL WHEN

include/model/CCountingModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {
121121
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
122122
//@}
123123

124+
//! Get the minimum seasonal variance scale
125+
virtual double minimumSeasonalVarianceScale() const;
126+
124127
private:
125128
//! Get the field values which partition the data for modeling.
126129
virtual TStrCRefVec partitioningFields() const;

include/model/CEventRateModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory {
131131
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
132132
//@}
133133

134+
//! Get the minimum seasonal variance scale
135+
virtual double minimumSeasonalVarianceScale() const;
136+
134137
private:
135138
//! Get the field values which partition the data for modeling.
136139
virtual TStrCRefVec partitioningFields() const;

include/model/CEventRatePopulationModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class MODEL_EXPORT CEventRatePopulationModelFactory : public CModelFactory {
133133
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
134134
//@}
135135

136+
//! Get the minimum seasonal variance scale
137+
virtual double minimumSeasonalVarianceScale() const;
138+
136139
private:
137140
//! Get the field values which partition the data for modeling.
138141
virtual TStrCRefVec partitioningFields() const;

include/model/CForecastDataSink.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include <maths/CModel.h>
1616

17+
#include <model/CModelParams.h>
1718
#include <model/ImportExport.h>
1819
#include <model/ModelTypes.h>
1920

@@ -38,7 +39,7 @@ namespace model {
3839
//! to change (e.g. the json writing should not happen in this class).
3940
class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable {
4041
public:
41-
using TMathsModelPtr = std::unique_ptr<maths::CModel>;
42+
using TMathsModelPtr = std::shared_ptr<maths::CModel>;
4243
using TStrUMap = boost::unordered_set<std::string>;
4344

4445
//! Wrapper for 1 timeseries model, its feature and by Field
@@ -59,18 +60,21 @@ class MODEL_EXPORT CForecastDataSink final : private core::CNonCopyable {
5960

6061
//! Everything that defines 1 series of forecasts
6162
struct MODEL_EXPORT SForecastResultSeries {
62-
SForecastResultSeries();
63+
SForecastResultSeries(const SModelParams& modelParams);
6364

6465
SForecastResultSeries(SForecastResultSeries&& other);
6566

6667
SForecastResultSeries(const SForecastResultSeries& that) = delete;
6768
SForecastResultSeries& operator=(const SForecastResultSeries&) = delete;
6869

70+
SModelParams s_ModelParams;
6971
int s_DetectorIndex;
7072
std::vector<SForecastModelWrapper> s_ToForecast;
73+
std::string s_ToForecastPersisted;
7174
std::string s_PartitionFieldName;
7275
std::string s_PartitionFieldValue;
7376
std::string s_ByFieldName;
77+
double s_MinimumSeasonalVarianceScale;
7478
};
7579

7680
//! \brief Data describing prerequisites prior predictions

include/model/CForecastModelPersist.h

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
#ifndef INCLUDED_ml_model_CForecastModelPersist_h
8+
#define INCLUDED_ml_model_CForecastModelPersist_h
9+
10+
#include <core/CJsonStatePersistInserter.h>
11+
#include <core/CJsonStateRestoreTraverser.h>
12+
13+
#include <maths/CModel.h>
14+
15+
#include <model/CModelParams.h>
16+
#include <model/ImportExport.h>
17+
#include <model/ModelTypes.h>
18+
19+
#include <boost/filesystem.hpp>
20+
21+
#include <fstream>
22+
#include <memory>
23+
24+
namespace ml {
25+
namespace model {
26+
27+
//! \brief Persist/Restore CModel sub-classes to/from text representations for
28+
//! the purpose of forecasting.
29+
//!
30+
//! DESCRIPTION:\n
31+
//! Persists/Restores models to disk for the purpose of restoring and forecasting
32+
//! on them.
33+
//!
34+
//! IMPLEMENTATION DECISIONS:\n
35+
//! Only as complete as required for forecasting.
36+
//!
37+
//! Persist and Restore are only done to avoid heap memory usage using temporary disk space.
38+
//! No need for backwards compatibility and version'ing as code will only be used
39+
//! locally never leaving process/io boundaries.
40+
class MODEL_EXPORT CForecastModelPersist final {
41+
public:
42+
using TMathsModelPtr = std::shared_ptr<maths::CModel>;
43+
44+
public:
45+
class MODEL_EXPORT CPersist final {
46+
public:
47+
explicit CPersist(const std::string& temporaryPath);
48+
49+
//! add a model to the persistence
50+
void addModel(const maths::CModel* model,
51+
const model_t::EFeature feature,
52+
const std::string& byFieldValue);
53+
54+
//! close the outputStream
55+
const std::string& finalizePersistAndGetFile();
56+
57+
private:
58+
static void persistOneModel(core::CStatePersistInserter& inserter,
59+
const maths::CModel* model,
60+
const model_t::EFeature feature,
61+
const std::string& byFieldValue);
62+
63+
private:
64+
//! the filename where to persist to
65+
boost::filesystem::path m_FileName;
66+
67+
//! the actual file where it models are persisted to
68+
std::ofstream m_OutStream;
69+
70+
//! number of models persisted
71+
size_t m_ModelCount;
72+
};
73+
74+
class MODEL_EXPORT CRestore final {
75+
public:
76+
explicit CRestore(const SModelParams& modelParams,
77+
double minimumSeasonalVarianceScale,
78+
const std::string& fileName);
79+
80+
//! add a model to the persistence
81+
bool nextModel(TMathsModelPtr& model, model_t::EFeature& feature, std::string& byFieldValue);
82+
83+
private:
84+
static bool restoreOneModel(core::CStateRestoreTraverser& traverser,
85+
SModelParams modelParams,
86+
double inimumSeasonalVarianceScale,
87+
TMathsModelPtr& model,
88+
model_t::EFeature& feature,
89+
std::string& byFieldValue);
90+
91+
private:
92+
//! model parameters required in order to restore the model
93+
SModelParams m_ModelParams;
94+
95+
//! minimum seasonal variance scale specific to the model
96+
double m_MinimumSeasonalVarianceScale;
97+
98+
//! the actual file where it models are persisted to
99+
std::ifstream m_InStream;
100+
101+
//! the persist inserter
102+
core::CJsonStateRestoreTraverser m_RestoreTraverser;
103+
}; // class CRestore
104+
}; // class CForecastModelPersist
105+
}
106+
}
107+
108+
#endif // INCLUDED_ml_model_CForecastModelPersist_h

include/model/CMetricModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ class MODEL_EXPORT CMetricModelFactory : public CModelFactory {
134134
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
135135
//@}
136136

137+
//! Get the minimum seasonal variance scale
138+
virtual double minimumSeasonalVarianceScale() const;
139+
137140
private:
138141
//! Get the field values which partition the data for modeling.
139142
virtual TStrCRefVec partitioningFields() const;

include/model/CMetricPopulationModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ class MODEL_EXPORT CMetricPopulationModelFactory : public CModelFactory {
133133
virtual void bucketResultsDelay(std::size_t bucketResultsDelay);
134134
//@}
135135

136+
//! Get the minimum seasonal variance scale
137+
virtual double minimumSeasonalVarianceScale() const;
138+
136139
private:
137140
//! Get the field values which partition the data for modeling.
138141
virtual TStrCRefVec partitioningFields() const;

include/model/CModelFactory.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ class MODEL_EXPORT CModelFactory {
346346
//! component.
347347
std::size_t componentSize() const;
348348

349+
//! Get the minimum seasonal variance scale, specific to the model
350+
virtual double minimumSeasonalVarianceScale() const = 0;
351+
349352
protected:
350353
using TMultivariatePriorPtrVec = std::vector<TMultivariatePriorPtr>;
351354
using TOptionalSearchKey = boost::optional<CSearchKey>;

0 commit comments

Comments
 (0)