Skip to content

[ML] Improve partition analysis memory usage #97

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Improve and use periodic boundary condition for seasonal component modeling ({pu
Improve robustness w.r.t. outliers of detection and initialisation of seasonal components ({pull}90[#90])
Improve behavior when there are abrupt changes in the seasonal components present in a time series ({pull}91[#91])
Explicit change point detection and modelling ({pull}92[#92])
Improve partition analysis memory usage ({pull}97[#97])

Forecasting of Machine Learning job time series is now supported for large jobs by temporarily storing
model state on disk ({pull}89[#89])
Expand Down
4 changes: 4 additions & 0 deletions include/api/CAnomalyJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <model/CHierarchicalResults.h>
#include <model/CHierarchicalResultsAggregator.h>
#include <model/CHierarchicalResultsNormalizer.h>
#include <model/CInterimBucketCorrector.h>
#include <model/CResourceMonitor.h>
#include <model/CResultsQueue.h>
#include <model/CSearchKey.h>
Expand Down Expand Up @@ -131,6 +132,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
const TModelPlotDataVecQueue& modelPlotQueue,
core_t::TTime time,
const model::CResourceMonitor::SResults& modelSizeStats,
const model::CInterimBucketCorrector& interimBucketCorrector,
const model::CHierarchicalResultsAggregator& aggregator,
core_t::TTime latestRecordTime,
core_t::TTime lastResultsTime);
Expand All @@ -139,6 +141,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
TModelPlotDataVecQueue s_ModelPlotQueue;
core_t::TTime s_Time;
model::CResourceMonitor::SResults s_ModelSizeStats;
model::CInterimBucketCorrector s_InterimBucketCorrector;
model::CHierarchicalResultsAggregator s_Aggregator;
std::string s_NormalizerState;
core_t::TTime s_LatestRecordTime;
Expand Down Expand Up @@ -263,6 +266,7 @@ class API_EXPORT CAnomalyJob : public CDataProcessor {
core_t::TTime time,
const TKeyCRefAnomalyDetectorPtrPrVec& detectors,
const model::CResourceMonitor::SResults& modelSizeStats,
const model::CInterimBucketCorrector& interimBucketCorrector,
const model::CHierarchicalResultsAggregator& aggregator,
const std::string& normalizerState,
core_t::TTime latestRecordTime,
Expand Down
18 changes: 2 additions & 16 deletions include/model/CAnomalyDetectorModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,8 @@ class MODEL_EXPORT CAnomalyDetectorModel : private core::CNonCopyable {
//! Clear out large state objects for people/attributes that are pruned
virtual void clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes) = 0;

//! Get the objects which calculates corrections for interim buckets.
const CInterimBucketCorrector& interimValueCorrector() const;
//! Get the object which calculates corrections for interim buckets.
virtual const CInterimBucketCorrector& interimValueCorrector() const = 0;

//! Check if any of the sample-filtering detection rules apply to this series.
bool shouldIgnoreSample(model_t::EFeature feature,
Expand All @@ -675,25 +675,14 @@ class MODEL_EXPORT CAnomalyDetectorModel : private core::CNonCopyable {
//! Get the non-estimated value of the the memory used by this model.
virtual std::size_t computeMemoryUsage() const = 0;

//! Restore interim bucket corrector.
bool interimBucketCorrectorAcceptRestoreTraverser(core::CStateRestoreTraverser& traverser);

//! Persist the interim bucket corrector.
void interimBucketCorrectorAcceptPersistInserter(const std::string& tag,
core::CStatePersistInserter& inserter) const;

//! Create a stub version of maths::CModel for use when pruning people
//! or attributes to free memory resource.
static maths::CModel* tinyModel();

private:
using TModelParamsCRef = boost::reference_wrapper<const SModelParams>;
using TInterimBucketCorrectorPtr = std::shared_ptr<CInterimBucketCorrector>;

private:
//! Set the current bucket total count.
virtual void currentBucketTotalCount(uint64_t totalCount) = 0;

//! Skip sampling the interval \p endTime - \p startTime.
virtual void doSkipSampling(core_t::TTime startTime, core_t::TTime endTime) = 0;

Expand All @@ -718,9 +707,6 @@ class MODEL_EXPORT CAnomalyDetectorModel : private core::CNonCopyable {
//! The influence calculators to use for each feature which is being
//! modeled.
TFeatureInfluenceCalculatorCPtrPrVecVec m_InfluenceCalculators;

//! A corrector that calculates adjustments for values of interim buckets.
TInterimBucketCorrectorPtr m_InterimBucketCorrector;
};
}
}
Expand Down
10 changes: 10 additions & 0 deletions include/model/CAnomalyDetectorModelConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
namespace ml {
namespace model {
class CDetectionRule;
class CInterimBucketCorrector;
class CSearchKey;
class CModelAutoConfigurer;
class CModelFactory;
Expand Down Expand Up @@ -68,6 +69,7 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
using TFeatureVec = model_t::TFeatureVec;
using TStrVec = std::vector<std::string>;
using TStrVecCItr = TStrVec::const_iterator;
using TInterimBucketCorrectorPtr = std::shared_ptr<CInterimBucketCorrector>;
using TModelFactoryPtr = std::shared_ptr<CModelFactory>;
using TModelFactoryCPtr = std::shared_ptr<const CModelFactory>;
using TFactoryTypeFactoryPtrMap = std::map<EFactoryType, TModelFactoryPtr>;
Expand Down Expand Up @@ -276,6 +278,8 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! Set the number of buckets to delay finalizing out-of-phase buckets.
void bucketResultsDelay(std::size_t delay);

//! Set the single interim bucket correction calculator.
void interimBucketCorrector(const TInterimBucketCorrectorPtr& interimBucketCorrector);
//! Set whether multivariate analysis of correlated 'by' fields should
//! be performed.
void multivariateByFields(bool enabled);
Expand Down Expand Up @@ -369,6 +373,9 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! Get the multiple bucket lengths.
const TTimeVec& multipleBucketLengths() const;

//! Get the single interim bucket correction calculator.
const CInterimBucketCorrector& interimBucketCorrector() const;

//! Should multivariate analysis of correlated 'by' fields be performed?
bool multivariateByFields() const;

Expand Down Expand Up @@ -453,6 +460,9 @@ class MODEL_EXPORT CAnomalyDetectorModelConfig {
//! Should multivariate analysis of correlated 'by' fields be performed?
bool m_MultivariateByFields;

//! The single interim bucket correction calculator.
TInterimBucketCorrectorPtr m_InterimBucketCorrector;

//! The new model factories for each data type.
TFactoryTypeFactoryPtrMap m_Factories;

Expand Down
35 changes: 23 additions & 12 deletions include/model/CCountingModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include <boost/unordered_map.hpp>

#include <memory>

class CCountingModelTest;

namespace ml {
Expand All @@ -30,19 +32,27 @@ namespace model {
//! memory limiting can cause us to stop getting counts and also makes
//! interpreting the maths library logging easier.
class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
public:
using TInterimBucketCorrectorPtr = std::shared_ptr<CInterimBucketCorrector>;

public:
//! \name Life-cycle.
//@{
//! \param[in] params The global configuration parameters.
//! \param[in] dataGatherer The object that gathers time series data.
CCountingModel(const SModelParams& params, const TDataGathererPtr& dataGatherer);
//! \param[in] interimBucketCorrector Calculates corrections for interim
//! buckets.
CCountingModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TInterimBucketCorrectorPtr& interimBucketCorrector);

//! Constructor used for restoring persisted models.
//!
//! \note The current bucket statistics are left default initialized
//! and so must be sampled for before this model can be used.
CCountingModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TInterimBucketCorrectorPtr& interimBucketCorrector,
core::CStateRestoreTraverser& traverser);

//! Create a copy that will result in the same persisted state as the
Expand Down Expand Up @@ -213,12 +223,6 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
//! Get the descriptions of any occurring scheduled event descriptions for the bucket time
virtual const TStr1Vec& scheduledEventDescriptions(core_t::TTime time) const;

public:
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TMeanAccumulator = maths::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TMeanAccumulatorVec = std::vector<TMeanAccumulator>;

protected:
//! Get the start time of the current bucket.
virtual core_t::TTime currentBucketStartTime() const;
Expand Down Expand Up @@ -255,22 +259,26 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
//! Initialize the time series models for newly observed people.
virtual void clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes);

//! Get the object which calculates corrections for interim buckets.
virtual const CInterimBucketCorrector& interimValueCorrector() const;

//! Check if bucket statistics are available for the specified time.
bool bucketStatsAvailable(core_t::TTime time) const;

//! Print the current bucketing interval.
std::string printCurrentBucket() const;

//! Set the current bucket total count.
virtual void currentBucketTotalCount(uint64_t totalCount);

//! Perform derived class specific operations to accomplish skipping sampling
virtual void doSkipSampling(core_t::TTime startTime, core_t::TTime endTime);

//! Get the model memory usage estimator
virtual CMemoryUsageEstimator* memoryUsageEstimator() const;

private:
using TSizeUInt64Pr = std::pair<std::size_t, uint64_t>;
using TSizeUInt64PrVec = std::vector<TSizeUInt64Pr>;
using TMeanAccumulator = maths::CBasicStatistics::SSampleMean<double>::TAccumulator;
using TMeanAccumulatorVec = std::vector<TMeanAccumulator>;
using TTimeStr1VecUMap = boost::unordered_map<core_t::TTime, TStr1Vec>;

private:
Expand All @@ -283,12 +291,15 @@ class MODEL_EXPORT CCountingModel : public CAnomalyDetectorModel {
//! The baseline bucket counts.
TMeanAccumulatorVec m_MeanCounts;

//! Map of matched scheduled event descriptions by bucket time
//! Map of matched scheduled event descriptions by bucket time.
TTimeStr1VecUMap m_ScheduledEventDescriptions;

//! Calculates corrections for interim buckets.
TInterimBucketCorrectorPtr m_InterimBucketCorrector;

friend class ::CCountingModelTest;
};
}
}

#endif // INCLUDED_ml_model_CModel_h
#endif // INCLUDED_ml_model_CCountingModel_h
7 changes: 4 additions & 3 deletions include/model/CCountingModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class MODEL_EXPORT CCountingModelFactory : public CModelFactory {
//! intended for unit testing and are not necessarily good defaults.
//! The CModelConfig class is responsible for providing sensible
//! default values for the factory for use within our products.
explicit CCountingModelFactory(const SModelParams& params,
model_t::ESummaryMode summaryMode = model_t::E_None,
const std::string& summaryCountFieldName = "");
CCountingModelFactory(const SModelParams& params,
const TInterimBucketCorrectorWPtr& interimBucketCorrector,
model_t::ESummaryMode summaryMode = model_t::E_None,
const std::string& summaryCountFieldName = "");

//! Create a copy of the factory owned by the calling code.
virtual CCountingModelFactory* clone() const;
Expand Down
21 changes: 12 additions & 9 deletions include/model/CEventRateModel.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
using TSizeFeatureDataPrVec = std::vector<TSizeFeatureDataPr>;
using TFeatureSizeFeatureDataPrVecPr = std::pair<model_t::EFeature, TSizeFeatureDataPrVec>;
using TFeatureSizeFeatureDataPrVecPrVec = std::vector<TFeatureSizeFeatureDataPrVecPr>;
using TInterimBucketCorrectorCPtr = std::shared_ptr<const CInterimBucketCorrector>;
using TCategoryProbabilityCache = CModelTools::CCategoryProbabilityCache;

//! The statistics we maintain about a bucketing interval.
Expand All @@ -70,8 +71,6 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
core_t::TTime s_StartTime;
//! The non-zero person counts in the current bucket.
TSizeUInt64PrVec s_PersonCounts;
//! The total count in the current bucket.
uint64_t s_TotalCount;
//! The feature data samples for the current bucketing interval.
TFeatureSizeFeatureDataPrVecPrVec s_FeatureData;
//! A cache of the corrections applied to interim results.
Expand All @@ -94,13 +93,16 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
//! of seeing the people we are modeling.
//! \param[in] influenceCalculators The influence calculators to use
//! for each feature.
//! \param[in] interimBucketCorrector Calculates corrections for interim
//! buckets.
CEventRateModel(const SModelParams& params,
const TDataGathererPtr& dataGatherer,
const TFeatureMathsModelPtrPrVec& newFeatureModels,
const TFeatureMultivariatePriorPtrPrVec& newFeatureCorrelateModelPriors,
const TFeatureCorrelationsPtrPrVec& featureCorrelatesModels,
const maths::CMultinomialConjugate& probabilityPrior,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators);
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector);

//! Constructor used for restoring persisted models.
//!
Expand All @@ -112,6 +114,7 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
const TFeatureMultivariatePriorPtrPrVec& newFeatureCorrelateModelPriors,
const TFeatureCorrelationsPtrPrVec& featureCorrelatesModels,
const TFeatureInfluenceCalculatorCPtrPrVecVec& influenceCalculators,
const TInterimBucketCorrectorCPtr& interimBucketCorrector,
core::CStateRestoreTraverser& traverser);

//! Create a copy that will result in the same persisted state as the
Expand Down Expand Up @@ -277,18 +280,15 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
//! Get writable person counts in the current bucket.
virtual TSizeUInt64PrVec& currentBucketPersonCounts();

//! Set the current bucket total count.
virtual void currentBucketTotalCount(uint64_t totalCount);

//! Get the total count of the current bucket.
uint64_t currentBucketTotalCount() const;

//! Get the interim corrections of the current bucket.
TFeatureSizeSizeTripleDouble1VecUMap& currentBucketInterimCorrections() const;

//! Clear out large state objects for people that are pruned.
virtual void clearPrunedResources(const TSizeVec& people, const TSizeVec& attributes);

//! Get the object which calculates corrections for interim buckets.
virtual const CInterimBucketCorrector& interimValueCorrector() const;

//! Check if there are correlates for \p feature and the person
//! identified by \p pid.
bool correlates(model_t::EFeature feature, std::size_t pid, core_t::TTime time) const;
Expand Down Expand Up @@ -319,6 +319,9 @@ class MODEL_EXPORT CEventRateModel : public CIndividualModel {
//! rarity).
maths::CMultinomialConjugate m_ProbabilityPrior;

//! Calculates corrections for interim buckets.
TInterimBucketCorrectorCPtr m_InterimBucketCorrector;

//! A cache of the person probabilities as of the start of the
//! for the bucketing interval.
TCategoryProbabilityCache m_Probabilities;
Expand Down
15 changes: 8 additions & 7 deletions include/model/CEventRateModelFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace model {
//! This concrete factory implements the methods to make new models
//! and data gatherers, and create default priors suitable for the
//! CEventRateModel class.
class MODEL_EXPORT CEventRateModelFactory : public CModelFactory {
class MODEL_EXPORT CEventRateModelFactory final : public CModelFactory {
public:
//! Lift all overloads into scope.
using CModelFactory::defaultMultivariatePrior;
Expand All @@ -33,9 +33,10 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory {
//! intended for unit testing and are not necessarily good defaults.
//! The CModelConfig class is responsible for providing sensible
//! default values for the factory for use within our products.
explicit CEventRateModelFactory(const SModelParams& params,
model_t::ESummaryMode summaryMode = model_t::E_None,
const std::string& summaryCountFieldName = "");
CEventRateModelFactory(const SModelParams& params,
const TInterimBucketCorrectorWPtr& interimBucketCorrector,
model_t::ESummaryMode summaryMode = model_t::E_None,
const std::string& summaryCountFieldName = "");

//! Create a copy of the factory owned by the calling code.
virtual CEventRateModelFactory* clone() const;
Expand Down Expand Up @@ -140,7 +141,7 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory {

private:
//! The identifier of the search for which this generates models.
int m_Identifier;
int m_Identifier = 0;

//! Indicates whether the data being gathered are already summarized
//! by an external aggregation process
Expand All @@ -165,13 +166,13 @@ class MODEL_EXPORT CEventRateModelFactory : public CModelFactory {
TStrVec m_InfluenceFieldNames;

//! If true the models will process missing person fields.
bool m_UseNull;
bool m_UseNull = false;

//! The count features which will be modeled.
TFeatureVec m_Features;

//! The bucket results delay.
std::size_t m_BucketResultsDelay;
std::size_t m_BucketResultsDelay = 0;

//! A cached search key.
mutable TOptionalSearchKey m_SearchKeyCache;
Expand Down
Loading