Skip to content

Commit fae7f38

Browse files
authored
[ML] Improve the accuracy of model memory control (#122)
1 parent a5edcfa commit fae7f38

35 files changed

+622
-891
lines changed

docs/CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Improve behavior when there are abrupt changes in the seasonal components presen
3636
Explicit change point detection and modelling ({pull}92[#92])
3737
Improve partition analysis memory usage ({pull}97[#97])
3838
Reduce model memory by storing state for periodicity testing in a compressed format ({pull}100[#100])
39+
Improve the accuracy of model memory control ({pull}122[#122])
3940

4041
Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
4142
model state on disk ({pull}89[#89])

include/api/CAnomalyJob.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,8 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
108108
public:
109109
using TPersistCompleteFunc =
110110
std::function<void(const CModelSnapshotJsonWriter::SModelSnapshotReport&)>;
111-
using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
111+
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
112112
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;
113-
using TAnomalyDetectorPtrVecItr = std::vector<TAnomalyDetectorPtr>::iterator;
114-
using TAnomalyDetectorPtrVecCItr = std::vector<TAnomalyDetectorPtr>::const_iterator;
115113
using TKeyVec = std::vector<model::CSearchKey>;
116114
using TKeyAnomalyDetectorPtrUMap =
117115
boost::unordered_map<model::CSearchKey::TStrKeyPr, TAnomalyDetectorPtr, model::CStrKeyPrHash, model::CStrKeyPrEqual>;
@@ -359,7 +357,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
359357
//! Update configuration
360358
void doForecast(const std::string& controlMessage);
361359

362-
model::CAnomalyDetector::TAnomalyDetectorPtr
360+
TAnomalyDetectorPtr
363361
makeDetector(int identifier,
364362
const model::CAnomalyDetectorModelConfig& modelConfig,
365363
model::CLimits& limits,

include/api/CForecastRunner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
109109
using TOStreamConcurrentWrapper = core::CConcurrentWrapper<std::ostream>;
110110
using TOStreamConcurrentWrapperPtr = std::shared_ptr<TOStreamConcurrentWrapper>;
111111

112-
using TAnomalyDetectorPtr = model::CAnomalyDetector::TAnomalyDetectorPtr;
112+
using TAnomalyDetectorPtr = std::shared_ptr<model::CAnomalyDetector>;
113113
using TAnomalyDetectorPtrVec = std::vector<TAnomalyDetectorPtr>;
114114

115115
using TForecastModelWrapper = model::CForecastDataSink::SForecastModelWrapper;

include/model/CAnomalyDetector.h

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,9 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
7070
using TStrVec = std::vector<std::string>;
7171
using TStrCPtrVec = std::vector<const std::string*>;
7272
using TModelPlotDataVec = std::vector<CModelPlotData>;
73-
7473
using TDataGathererPtr = std::shared_ptr<CDataGatherer>;
7574
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
7675
using TModelPtr = std::unique_ptr<CAnomalyDetectorModel>;
77-
78-
//! A shared pointer to an instance of this class
79-
using TAnomalyDetectorPtr = std::shared_ptr<CAnomalyDetector>;
80-
8176
using TOutputModelPlotDataFunc =
8277
std::function<void(const std::string&, const std::string&, const std::string&, const std::string&, const CModelPlotData&)>;
8378
using TStrSet = CAnomalyDetectorModelConfig::TStrSet;
@@ -334,14 +329,13 @@ class MODEL_EXPORT CAnomalyDetector : private core::CNonCopyable {
334329
//! in the model ensemble class.
335330
void legacyModelsAcceptPersistInserter(core::CStatePersistInserter& inserter) const;
336331

337-
protected:
338-
//! Configurable limits
339-
CLimits& m_Limits;
340-
341332
private:
342333
//! An identifier for the search for which this is detecting anomalies.
343334
int m_DetectorIndex;
344335

336+
//! Configurable limits
337+
CLimits& m_Limits;
338+
345339
//! Configurable behaviour
346340
const CAnomalyDetectorModelConfig& m_ModelConfig;
347341

include/model/CDataGatherer.h

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -108,36 +108,24 @@ class MODEL_EXPORT CDataGatherer {
108108
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
109109
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
110110
using TFeatureVec = model_t::TFeatureVec;
111-
using TFeatureVecCItr = TFeatureVec::const_iterator;
112111
using TSizeSizePr = std::pair<std::size_t, std::size_t>;
113112
using TSizeSizePrUInt64Pr = std::pair<TSizeSizePr, uint64_t>;
114113
using TSizeSizePrUInt64PrVec = std::vector<TSizeSizePrUInt64Pr>;
115114
using TSizeSizePrUInt64UMap = boost::unordered_map<TSizeSizePr, uint64_t>;
116-
using TSizeSizePrUInt64UMapItr = TSizeSizePrUInt64UMap::iterator;
117-
using TSizeSizePrUInt64UMapCItr = TSizeSizePrUInt64UMap::const_iterator;
118115
using TSizeSizePrUInt64UMapQueue = CBucketQueue<TSizeSizePrUInt64UMap>;
119-
using TSizeSizePrUInt64UMapQueueItr = TSizeSizePrUInt64UMapQueue::iterator;
120-
using TSizeSizePrUInt64UMapQueueCItr = TSizeSizePrUInt64UMapQueue::const_iterator;
121-
using TSizeSizePrUInt64UMapQueueCRItr = TSizeSizePrUInt64UMapQueue::const_reverse_iterator;
122116
using TSizeSizePrStoredStringPtrPrUInt64UMap = CBucketGatherer::TSizeSizePrStoredStringPtrPrUInt64UMap;
123-
using TSizeSizePrStoredStringPtrPrUInt64UMapCItr =
124-
TSizeSizePrStoredStringPtrPrUInt64UMap::const_iterator;
125-
using TSizeSizePrStoredStringPtrPrUInt64UMapItr =
126-
TSizeSizePrStoredStringPtrPrUInt64UMap::iterator;
127117
using TSizeSizePrStoredStringPtrPrUInt64UMapVec =
128118
std::vector<TSizeSizePrStoredStringPtrPrUInt64UMap>;
129119
using TSizeSizePrStoredStringPtrPrUInt64UMapVecQueue =
130120
CBucketQueue<TSizeSizePrStoredStringPtrPrUInt64UMapVec>;
131121
using TSearchKeyCRef = boost::reference_wrapper<const CSearchKey>;
132-
using TBucketGathererPVec = std::vector<CBucketGatherer*>;
133-
using TBucketGathererPVecItr = TBucketGathererPVec::iterator;
134-
using TBucketGathererPVecCItr = TBucketGathererPVec::const_iterator;
122+
using TBucketGathererPtr = std::unique_ptr<CBucketGatherer>;
123+
using TBucketGathererPtrVec = std::vector<TBucketGathererPtr>;
135124
using TFeatureAnyPr = std::pair<model_t::EFeature, boost::any>;
136125
using TFeatureAnyPrVec = std::vector<TFeatureAnyPr>;
137126
using TMetricCategoryVec = std::vector<model_t::EMetricCategory>;
138-
using TSampleCountsPtr = std::shared_ptr<CSampleCounts>;
127+
using TSampleCountsPtr = std::unique_ptr<CSampleCounts>;
139128
using TTimeVec = std::vector<core_t::TTime>;
140-
using TTimeVecCItr = TTimeVec::const_iterator;
141129

142130
public:
143131
//! The summary count indicating an explicit null record.
@@ -161,8 +149,6 @@ class MODEL_EXPORT CDataGatherer {
161149
//! \param[in] modelParams The global configuration parameters.
162150
//! \param[in] summaryCountFieldName If \p summaryMode is E_Manual
163151
//! then this is the name of the field holding the summary count.
164-
//! \param[in] partitionFieldName The name of the field which splits
165-
//! the data.
166152
//! \param[in] partitionFieldValue The value of the field which splits
167153
//! the data.
168154
//! \param[in] personFieldName The name of the field which identifies
@@ -173,8 +159,6 @@ class MODEL_EXPORT CDataGatherer {
173159
//! the metric values.
174160
//! \param[in] influenceFieldNames The field names for which we will
175161
//! compute influences.
176-
//! \param[in] useNull If true the gatherer will process missing
177-
//! person and attribute field values (assuming they are empty).
178162
//! \param[in] key The key of the search for which to gatherer data.
179163
//! \param[in] features The features of the data to model.
180164
//! \param[in] startTime The start of the time interval for which
@@ -187,13 +171,11 @@ class MODEL_EXPORT CDataGatherer {
187171
model_t::ESummaryMode summaryMode,
188172
const SModelParams& modelParams,
189173
const std::string& summaryCountFieldName,
190-
const std::string& partitionFieldName,
191174
const std::string& partitionFieldValue,
192175
const std::string& personFieldName,
193176
const std::string& attributeFieldName,
194177
const std::string& valueFieldName,
195178
const TStrVec& influenceFieldNames,
196-
bool useNull,
197179
const CSearchKey& key,
198180
const TFeatureVec& features,
199181
core_t::TTime startTime,
@@ -204,13 +186,11 @@ class MODEL_EXPORT CDataGatherer {
204186
model_t::ESummaryMode summaryMode,
205187
const SModelParams& modelParams,
206188
const std::string& summaryCountFieldName,
207-
const std::string& partitionFieldName,
208189
const std::string& partitionFieldValue,
209190
const std::string& personFieldName,
210191
const std::string& attributeFieldName,
211192
const std::string& valueFieldName,
212193
const TStrVec& influenceFieldNames,
213-
bool useNull,
214194
const CSearchKey& key,
215195
core::CStateRestoreTraverser& traverser);
216196

@@ -220,8 +200,9 @@ class MODEL_EXPORT CDataGatherer {
220200
//! redundant except to create a signature that will not be mistaken for
221201
//! a general purpose copy constructor.
222202
CDataGatherer(bool isForPersistence, const CDataGatherer& other);
223-
224203
~CDataGatherer();
204+
CDataGatherer(const CDataGatherer&) = delete;
205+
CDataGatherer& operator=(const CDataGatherer&) = delete;
225206
//@}
226207

227208
//! \name Persistence
@@ -546,7 +527,7 @@ class MODEL_EXPORT CDataGatherer {
546527
void resetSampleCount(std::size_t id);
547528

548529
//! Get the sample counts.
549-
TSampleCountsPtr sampleCounts() const;
530+
const TSampleCountsPtr& sampleCounts() const;
550531
//@}
551532

552533
//! \name Time
@@ -759,7 +740,7 @@ class MODEL_EXPORT CDataGatherer {
759740

760741
//! The collection of bucket gatherers which contain the bucket-specific
761742
//! metrics and counts.
762-
TBucketGathererPVec m_Gatherers;
743+
TBucketGathererPtrVec m_Gatherers;
763744

764745
//! Indicates whether the data being gathered are already summarized
765746
//! by an external aggregation process.
@@ -768,15 +749,12 @@ class MODEL_EXPORT CDataGatherer {
768749
//! The global configuration parameters.
769750
TModelParamsCRef m_Params;
770751

771-
//! The partition field name or an empty string if there isn't one.
772-
std::string m_PartitionFieldName;
752+
//! The key of the search for which data is being gathered.
753+
TSearchKeyCRef m_SearchKey;
773754

774755
//! The value of the partition field for this detector.
775756
core::CStoredStringPtr m_PartitionFieldValue;
776757

777-
//! The key of the search for which data is being gathered.
778-
TSearchKeyCRef m_SearchKey;
779-
780758
//! A registry where person names are mapped to unique IDs.
781759
CDynamicStringIdRegistry m_PeopleRegistry;
782760

include/model/CLimits.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,7 @@ class MODEL_EXPORT CLimits {
6464

6565
public:
6666
//! Default constructor
67-
CLimits();
68-
69-
//! Default destructor
70-
~CLimits();
67+
explicit CLimits(double byteLimitMargin = CResourceMonitor::DEFAULT_BYTE_LIMIT_MARGIN);
7168

7269
//! Initialise from a config file. This overwrites current settings
7370
//! with any found in the config file. Settings that are not present

include/model/CResourceMonitor.h

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
#include <model/ImportExport.h>
1212
#include <model/ModelTypes.h>
1313

14+
#include <boost/unordered_map.hpp>
15+
1416
#include <functional>
15-
#include <map>
1617

1718
class CResourceMonitorTest;
1819
class CResourceLimitTest;
@@ -42,20 +43,21 @@ class MODEL_EXPORT CResourceMonitor {
4243
};
4344

4445
public:
45-
using TModelPtrSizePr = std::pair<CAnomalyDetectorModel*, std::size_t>;
46-
using TModelPtrSizeMap = std::map<CAnomalyDetectorModel*, std::size_t>;
46+
using TDetectorPtrSizePr = std::pair<CAnomalyDetector*, std::size_t>;
47+
using TDetectorPtrSizeUMap = boost::unordered_map<CAnomalyDetector*, std::size_t>;
4748
using TMemoryUsageReporterFunc = std::function<void(const CResourceMonitor::SResults&)>;
4849
using TTimeSizeMap = std::map<core_t::TTime, std::size_t>;
4950

5051
//! The minimum time between prunes
5152
static const core_t::TTime MINIMUM_PRUNE_FREQUENCY;
52-
5353
//! Default memory limit for resource monitor
5454
static const std::size_t DEFAULT_MEMORY_LIMIT_MB;
55+
//! The initial byte limit margin to use if none is supplied
56+
static const double DEFAULT_BYTE_LIMIT_MARGIN;
5557

5658
public:
5759
//! Default constructor
58-
CResourceMonitor();
60+
explicit CResourceMonitor(double byteLimitMargin = DEFAULT_BYTE_LIMIT_MARGIN);
5961

6062
//! Query the resource monitor to find out if the models are
6163
//! taking up too much memory and further allocations should be banned
@@ -127,13 +129,21 @@ class MODEL_EXPORT CResourceMonitor {
127129
//! Clears all extra memory
128130
void clearExtraMemory();
129131

132+
//! Decrease the margin on the memory limit.
133+
//!
134+
//! We start off applying a margin to the memory limit because
135+
//! it is difficult to accurately estimate the long term memory
136+
//! usage at this point. This is gradually decreased over time
137+
//! by calling this pnce per bucket processed.
138+
void decreaseMargin(core_t::TTime elapsedTime);
139+
130140
private:
131141
//! Updates the memory limit fields and the prune threshold
132142
//! to the given value.
133143
void updateMemoryLimitsAndPruneThreshold(std::size_t limitMBs);
134144

135145
//! Update the given model and recalculate the total usage
136-
void memUsage(CAnomalyDetectorModel* model);
146+
void memUsage(CAnomalyDetector* detector);
137147

138148
//! Determine if we need to send a usage report, based on
139149
//! increased usage, or increased errors
@@ -143,16 +153,25 @@ class MODEL_EXPORT CResourceMonitor {
143153
//! shoule be allowed or not
144154
void updateAllowAllocations();
145155

156+
//! Get the high memory limit with margin applied.
157+
std::size_t highLimit() const;
158+
159+
//! Get the low memory limit with margin applied.
160+
std::size_t lowLimit() const;
161+
146162
//! Returns the sum of used memory plus any extra memory
147163
std::size_t totalMemory() const;
148164

149165
private:
150166
//! The registered collection of components
151-
TModelPtrSizeMap m_Models;
167+
TDetectorPtrSizeUMap m_Detectors;
152168

153169
//! Is there enough free memory to allow creating new components
154170
bool m_AllowAllocations;
155171

172+
//! The relative margin to apply to the byte limits.
173+
double m_ByteLimitMargin;
174+
156175
//! The upper limit for memory usage, checked on increasing values
157176
std::size_t m_ByteLimitHigh;
158177

lib/api/CAnomalyJob.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ void CAnomalyJob::outputBucketResultsUntil(core_t::TTime time) {
413413
lastBucketEndTime + bucketLength + latency <= time;
414414
lastBucketEndTime += effectiveBucketLength) {
415415
this->outputResults(lastBucketEndTime);
416+
m_Limits.resourceMonitor().decreaseMargin(bucketLength);
416417
m_Limits.resourceMonitor().sendMemoryUsageReportIfSignificantlyChanged(lastBucketEndTime);
417418
m_LastFinalisedBucketEndTime = lastBucketEndTime + effectiveBucketLength;
418419

@@ -1403,7 +1404,7 @@ CAnomalyJob::detectorForKey(bool isRestoring,
14031404
// Check if we need to and are allowed to create a new detector.
14041405
if (itr == m_Detectors.end() && resourceMonitor.areAllocationsAllowed()) {
14051406
// Create an placeholder for the anomaly detector.
1406-
model::CAnomalyDetector::TAnomalyDetectorPtr& detector =
1407+
TAnomalyDetectorPtr& detector =
14071408
m_Detectors
14081409
.emplace(model::CSearchKey::TStrKeyPr(partition, key), TAnomalyDetectorPtr())
14091410
.first->second;
@@ -1450,7 +1451,7 @@ void CAnomalyJob::pruneAllModels() {
14501451
}
14511452
}
14521453

1453-
model::CAnomalyDetector::TAnomalyDetectorPtr
1454+
CAnomalyJob::TAnomalyDetectorPtr
14541455
CAnomalyJob::makeDetector(int identifier,
14551456
const model::CAnomalyDetectorModelConfig& modelConfig,
14561457
model::CLimits& limits,

0 commit comments

Comments
 (0)