diff --git a/src/nodejs/qm/qm_nodejs_streamaggr.h b/src/nodejs/qm/qm_nodejs_streamaggr.h index d070b57c7..57ffe7f95 100644 --- a/src/nodejs/qm/qm_nodejs_streamaggr.h +++ b/src/nodejs/qm/qm_nodejs_streamaggr.h @@ -1834,7 +1834,7 @@ * @property {string} type - The type for the stream aggregator. Important: It must be equal to `'pagehinkley'`. * @property {string} store - The name of the store consistent with the records that it processes. * @property {string} inAggr - The name of the stream aggregate used for input (i.e. {@link module:qm.StreamAggrTimeSeriesTick}). -* @property {number} maxInstances - Minimal number of instances needed for initialization of the aggregator (when can first concept drift be initialized?). +* @property {number} minInstances - Minimal number of instances needed for initialization of the aggregator (when can first concept drift be initialized?). * @property {number} delta - The delta factor for the Page Hinkley test. * @property {number} lambda - The change detection threshold. * @property {number} alpha - The forgetting factor, used to weight the observed value and the mean. @@ -1882,7 +1882,6 @@ * * // creating start time * let time = new Date(); -* let timeStr = time.toISOString(); * let changes = 0; * * // simulating concept drift at element 1000 in a time series @@ -1904,8 +1903,8 @@ * } * * // checking if drift has been correctly detected -* if (changes >= 1) console.log("Last concept drift was detected " + pht.val.driftOffset + " samples ago."); -* else console.log("No concept drift was detected!"); +* if (changes >= 1) { console.log("Last concept drift was detected " + pht.val.driftOffset + " samples ago."); } +* else { console.log("No concept drift was detected!"); } * * // clean up * base.close(); diff --git a/src/qminer/qminer_aggr.cpp b/src/qminer/qminer_aggr.cpp index aec978fd3..bd8a8a008 100644 --- a/src/qminer/qminer_aggr.cpp +++ b/src/qminer/qminer_aggr.cpp @@ -2785,121 +2785,112 @@ PJsonVal THistogramAD::SaveJson(const int& Limit) const { ////////////////////////////////////////////////////////////////////////////////// /// Page-Hinkley test for concept drift detection aggregate -void TPageHinkley::OnStep(const TWPt& CallerAggr) { - TScopeStopWatch StopWatch(ExeTm); - - if (Drift > 0) Reset(); - TFlt X = InAggrVal->GetFlt(); - XMean = XMean + (X - XMean) / (1.0 * SampleCount); - Sum = Alpha * Sum + (X - XMean - Delta); - - SampleCount++; - - /// Only update drift offset, if it has already occured - if (DriftOffset >= 0) DriftOffset++; - - if ((SampleCount > MinInstances) && (Sum > Lambda)) { - Drift = 1; - DriftOffset = 0; - } +void TPageHinkley::OnStep(const TWPt& CallerAggr) { + TScopeStopWatch StopWatch(ExeTm); + + if (Drift > 0) { Reset(); }; + double X = InAggrVal->GetFlt(); + XMean = XMean + (X - XMean) / (double)SampleCount; + Sum = Alpha * Sum + (X - XMean - Delta); + + SampleCount++; + + /// Only update drift offset, if it has already occured + if (DriftOffset >= 0) { DriftOffset++; }; + + if ((SampleCount > MinInstances) && (Sum > Lambda)) { + Drift = 1; + DriftOffset = 0; + } } void TPageHinkley::OnTime(const uint64& TmMsec, const TWPt& CallerAggr) { - throw TExcept::New("TPageHinkley::OnTime(const uint64& TmMsec, const TWPt& CallerAggr) not supported."); + throw TExcept::New("TPageHinkley::OnTime(const uint64& TmMsec, const TWPt& CallerAggr) not supported."); } TPageHinkley::TPageHinkley(const TWPt& Base, const PJsonVal& ParamVal) : TStreamAggr(Base, ParamVal) { - Reset(); - // only set drift offset to negative value in the beginning - // DriftOffset = 0 is set, when the first drift has been detected - // after that the offset is incremented in each step so that we can - // monitor, when the concept drift appeared - DriftOffset = -1; - MinInstances = (int)ParamVal->GetObjNum("minInstances", 30); - Delta = (double)ParamVal->GetObjNum("delta", 0.005); - Lambda = (double)ParamVal->GetObjNum("lambda", 50.0); - Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999); - // parse out input aggregate - InAggrVal = Cast(ParseAggr(ParamVal, "inAggr")); + Reset(); + // only set drift offset to negative value in the beginning + // DriftOffset = 0 is set, when the first drift has been detected + // after that the offset is incremented in each step so that we can + // monitor, when the concept drift appeared + DriftOffset = -1; + MinInstances = (int)ParamVal->GetObjNum("minInstances", 30); + Delta = (double)ParamVal->GetObjNum("delta", 0.005); + Lambda = (double)ParamVal->GetObjNum("lambda", 50.0); + Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999); + // parse out input aggregate + InAggrVal = Cast(ParseAggr(ParamVal, "inAggr")); } PJsonVal TPageHinkley::GetParams() const { - PJsonVal Obj = TJsonVal::NewObj(); - Obj->AddToObj("minInstances", MinInstances); - Obj->AddToObj("delta", Delta); - Obj->AddToObj("lambda", Lambda); - Obj->AddToObj("alpha", Alpha); - return Obj; + PJsonVal Obj = TJsonVal::NewObj(); + Obj->AddToObj("minInstances", MinInstances); + Obj->AddToObj("delta", Delta); + Obj->AddToObj("lambda", Lambda); + Obj->AddToObj("alpha", Alpha); + return Obj; } void TPageHinkley::SetParams(const PJsonVal& ParamVal) { - MinInstances = (int)ParamVal->GetObjNum("minInstances", 30); - Delta = (double)ParamVal->GetObjNum("delta", 0.005); - Lambda = (double)ParamVal->GetObjNum("lambda", 50.0); - Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999); -} - -void TPageHinkley::LoadState(TSIn& SIn) { - MinInstances.Load(SIn); - Delta.Load(SIn); - Lambda.Load(SIn); - Alpha.Load(SIn); - XMean.Load(SIn); - SampleCount.Load(SIn); - Sum.Load(SIn); - Drift.Load(SIn); - DriftOffset.Load(SIn); -} - -void TPageHinkley::SaveState(TSOut& SOut) const { - MinInstances.Save(SOut); - Delta.Save(SOut); - Lambda.Save(SOut); - Alpha.Save(SOut); - XMean.Save(SOut); - SampleCount.Save(SOut); - Sum.Save(SOut); - Drift.Save(SOut); - DriftOffset.Save(SOut); + MinInstances = (int)ParamVal->GetObjNum("minInstances", 30); + Delta = (double)ParamVal->GetObjNum("delta", 0.005); + Lambda = (double)ParamVal->GetObjNum("lambda", 50.0); + Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999); +} + +void TPageHinkley::LoadState(TSIn& SIn) { + XMean.Load(SIn); + SampleCount.Load(SIn); + Sum.Load(SIn); + Drift.Load(SIn); + DriftOffset.Load(SIn); +} + +void TPageHinkley::SaveState(TSOut& SOut) const { + XMean.Save(SOut); + SampleCount.Save(SOut); + Sum.Save(SOut); + Drift.Save(SOut); + DriftOffset.Save(SOut); } void TPageHinkley::Reset() { - MinInstances = 30; - Delta = 0.005; - Lambda = 50.0; - Alpha = 0.9999; - XMean = 0.0; - SampleCount = 1; - Sum = 0.0; - Drift = 0; - DriftOffset = 0; + Delta = 0.005; + Lambda = 50.0; + Alpha = 0.9999; + XMean = 0.0; + SampleCount = 1; + Sum = 0.0; + Drift = 0; + DriftOffset = 0; } int TPageHinkley::GetInt() const { - return GetNmInt("drift"); + return GetNmInt("drift"); } int TPageHinkley::GetNmInt(const TStr& Nm) const { - if (Nm == "drift") { - return Drift; - } - else if (Nm == "driftOffset") { - return DriftOffset; - } - else { - throw TExcept::New("TPageHinkley::GetNmInt unknown key: " + Nm); - } + if (Nm == "drift") { + return Drift; + } + else if (Nm == "driftOffset") { + return DriftOffset; + } + else { + throw TExcept::New("TPageHinkley::GetNmInt unknown key: " + Nm); + } } PJsonVal TPageHinkley::SaveJson(const int& Limit) const { - PJsonVal Obj = TJsonVal::NewObj(); - Obj->AddToObj("minInstances", MinInstances); - Obj->AddToObj("delta", Delta); - Obj->AddToObj("lambda", Lambda); - Obj->AddToObj("alpha", Alpha); - Obj->AddToObj("drift", Drift); - Obj->AddToObj("driftOffset", DriftOffset); - return Obj; + PJsonVal Obj = TJsonVal::NewObj(); + Obj->AddToObj("minInstances", MinInstances); + Obj->AddToObj("delta", Delta); + Obj->AddToObj("lambda", Lambda); + Obj->AddToObj("alpha", Alpha); + Obj->AddToObj("drift", Drift); + Obj->AddToObj("driftOffset", DriftOffset); + return Obj; } } // TStreamAggrs namespace diff --git a/src/qminer/qminer_aggr.h b/src/qminer/qminer_aggr.h index ea4823e4a..7a6cf9a82 100644 --- a/src/qminer/qminer_aggr.h +++ b/src/qminer/qminer_aggr.h @@ -2206,68 +2206,68 @@ class THistogramAD : public TStreamAggr, public TStreamAggrOut::INmInt, public T /// moment. Page Hinkley won't output warning zone warnings, only change detections. The method works by /// means of the Page Hinkley test.In general lines it will detect a concept drift if the observed mean /// at some instant is greater then a threshold value lambda. -class TPageHinkley : public TStreamAggr, - public TStreamAggrOut::INmInt, - public TStreamAggrOut::IInt { +class TPageHinkley : public TStreamAggr, + public TStreamAggrOut::INmInt, + public TStreamAggrOut::IInt { private: - /// Input for prediction - TWPt InAggrVal; - /// Minimum instances to trigger concept drift - TInt MinInstances; - /// The delta factor for Page-Hinkley test - TFlt Delta; - /// The change detection threshold - TFlt Lambda; - /// The forgetting factor, used to weight the observed value and the mean - TFlt Alpha; - /// Mean of the input values - TFlt XMean; - /// Count of the samples - TInt SampleCount; - /// Sum for calculating standard deviation - TFlt Sum; - /// Drift was detected - TInt Drift; - /// How long ago drift was detected - TInt DriftOffset; + /// Input for prediction + TWPt InAggrVal; + /// Minimum instances to trigger concept drift + TInt MinInstances; + /// The delta factor for Page-Hinkley test + TFlt Delta; + /// The change detection threshold + TFlt Lambda; + /// The forgetting factor, used to weight the observed value and the mean + TFlt Alpha; + /// Mean of the input values + TFlt XMean; + /// Count of the samples + TInt SampleCount; + /// Sum for calculating standard deviation + TFlt Sum; + /// Drift was detected + TInt Drift; + /// How long ago drift was detected + TInt DriftOffset; protected: - /// Placeholder just throwing exception. - void OnTime(const uint64& TmMsec, const TWPt& CallerAggr); - /// Placeholder just throwing exception. - void OnStep(const TWPt& CallerAggr); - /// JSON based constructor. - TPageHinkley(const TWPt& Base, const PJsonVal& ParamVal); + /// Placeholder just throwing exception. + void OnTime(const uint64& TmMsec, const TWPt& CallerAggr); + /// Placeholder just throwing exception. + void OnStep(const TWPt& CallerAggr); + /// JSON based constructor. + TPageHinkley(const TWPt& Base, const PJsonVal& ParamVal); public: - /// Smart pointer constructor - static PStreamAggr New(const TWPt& Base, const PJsonVal& ParamVal) { return new TPageHinkley(Base, ParamVal); } - - /// Loads state - void LoadState(TSIn& SIn); - /// Saves state - void SaveState(TSOut& SOut) const; + /// Smart pointer constructor + static PStreamAggr New(const TWPt& Base, const PJsonVal& ParamVal) { return new TPageHinkley(Base, ParamVal); } - /// Returns the parameters - PJsonVal GetParams() const; - /// Sets the parameters - used for updating the map (extend or replace) - void SetParams(const PJsonVal& ParamVal); - - /// Is the aggregate initialized? - bool IsInit() const { return SampleCount > MinInstances; } - /// Returns true if the string is supported - bool IsNmInt(const TStr& Nm) const { return (Nm == "driftOffset") || (Nm == "drift"); } - /// Returns current drift status - int GetInt() const; - /// Returns the current drift status and last drift offset - int GetNmInt(const TStr& Nm) const; - /// Resets the aggregate - void Reset(); - /// JSON serialization - PJsonVal SaveJson(const int& Limit) const; - /// Stream aggregator type name - static TStr GetType() { return "pagehinkley"; } - /// Stream aggregator type name - TStr Type() const { return GetType(); } + /// Loads state + void LoadState(TSIn& SIn); + /// Saves state + void SaveState(TSOut& SOut) const; + + /// Returns the parameters + PJsonVal GetParams() const; + /// Sets the parameters - used for updating the map (extend or replace) + void SetParams(const PJsonVal& ParamVal); + + /// Is the aggregate initialized? + bool IsInit() const { return SampleCount > MinInstances; } + /// Returns true if the string is supported + bool IsNmInt(const TStr& Nm) const { return (Nm == "driftOffset") || (Nm == "drift"); } + /// Returns current drift status + int GetInt() const; + /// Returns the current drift status and last drift offset + int GetNmInt(const TStr& Nm) const; + /// Resets the aggregate + void Reset(); + /// JSON serialization + PJsonVal SaveJson(const int& Limit) const; + /// Stream aggregator type name + static TStr GetType() { return "pagehinkley"; } + /// Stream aggregator type name + TStr Type() const { return GetType(); } }; /////////////////////////////// diff --git a/src/qminer/qminer_core.cpp b/src/qminer/qminer_core.cpp index 4b9e789e6..605f44009 100644 --- a/src/qminer/qminer_core.cpp +++ b/src/qminer/qminer_core.cpp @@ -6612,7 +6612,7 @@ void TStreamAggr::Init() { Register(); Register(); Register(); - Register(); + Register(); Register(); } diff --git a/test/nodejs/streamaggr2.js b/test/nodejs/streamaggr2.js index 05c19993f..891fa6fc4 100644 --- a/test/nodejs/streamaggr2.js +++ b/test/nodejs/streamaggr2.js @@ -1374,7 +1374,6 @@ describe("test concept drift methods", function() { it ('should not detect concept drift', function() { // creating start time let time = new Date(); - let timeStr = time.toISOString(); let changes = 0; time.setSeconds(time.getSeconds() - 1000); @@ -1391,7 +1390,7 @@ describe("test concept drift methods", function() { changes++; } } - // checking if drift has been correctly detected + // checking that drift has not been detected assert.equal(changes, 0); driftOffset = pht.saveJson().driftOffset; assert.equal(driftOffset, -1); @@ -1400,7 +1399,6 @@ describe("test concept drift methods", function() { it('should detect concept drift in proximity of simulated change', function() { // creating start time let time = new Date(); - let timeStr = time.toISOString(); let changes = 0; // simulating concept drift at element 1000 in a time series