Skip to content

Commit

Permalink
Minor cosmetic fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
klemenkenda committed Oct 24, 2018
1 parent b4ac47f commit 65f0113
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 156 deletions.
7 changes: 3 additions & 4 deletions src/nodejs/qm/qm_nodejs_streamaggr.h
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,7 @@
* @property {string} type - The type for the stream aggregator. <b>Important:</b> It must be equal to `'pagehinkley'`.
* @property {string} store - The name of the store consistent with the records that it processes.
* @property {string} inAggr - The name of the stream aggregate used for input (i.e. {@link module:qm.StreamAggrTimeSeriesTick}).
* @property {number} maxInstances - Minimal number of instances needed for initialization of the aggregator (when can first concept drift be initialized?).
* @property {number} minInstances - Minimal number of instances needed for initialization of the aggregator (when can first concept drift be initialized?).
* @property {number} delta - The delta factor for the Page Hinkley test.
* @property {number} lambda - The change detection threshold.
* @property {number} alpha - The forgetting factor, used to weight the observed value and the mean.
Expand Down Expand Up @@ -1882,7 +1882,6 @@
*
* // creating start time
* let time = new Date();
* let timeStr = time.toISOString();
* let changes = 0;
*
* // simulating concept drift at element 1000 in a time series
Expand All @@ -1904,8 +1903,8 @@
* }
*
* // checking if drift has been correctly detected
* if (changes >= 1) console.log("Last concept drift was detected " + pht.val.driftOffset + " samples ago.");
* else console.log("No concept drift was detected!");
* if (changes >= 1) { console.log("Last concept drift was detected " + pht.val.driftOffset + " samples ago."); }
* else { console.log("No concept drift was detected!"); }
*
* // clean up
* base.close();
Expand Down
173 changes: 82 additions & 91 deletions src/qminer/qminer_aggr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2785,121 +2785,112 @@ PJsonVal THistogramAD::SaveJson(const int& Limit) const {
//////////////////////////////////////////////////////////////////////////////////
/// Page-Hinkley test for concept drift detection aggregate

void TPageHinkley::OnStep(const TWPt<TStreamAggr>& CallerAggr) {
TScopeStopWatch StopWatch(ExeTm);
if (Drift > 0) Reset();
TFlt X = InAggrVal->GetFlt();
XMean = XMean + (X - XMean) / (1.0 * SampleCount);
Sum = Alpha * Sum + (X - XMean - Delta);
SampleCount++;
/// Only update drift offset, if it has already occured
if (DriftOffset >= 0) DriftOffset++;

if ((SampleCount > MinInstances) && (Sum > Lambda)) {
Drift = 1;
DriftOffset = 0;
}
void TPageHinkley::OnStep(const TWPt<TStreamAggr>& CallerAggr) {
TScopeStopWatch StopWatch(ExeTm);

if (Drift > 0) { Reset(); };
double X = InAggrVal->GetFlt();
XMean = XMean + (X - XMean) / (double)SampleCount;
Sum = Alpha * Sum + (X - XMean - Delta);

SampleCount++;

/// Only update drift offset, if it has already occured
if (DriftOffset >= 0) { DriftOffset++; };

if ((SampleCount > MinInstances) && (Sum > Lambda)) {
Drift = 1;
DriftOffset = 0;
}
}

void TPageHinkley::OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr) {
throw TExcept::New("TPageHinkley::OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr) not supported.");
throw TExcept::New("TPageHinkley::OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr) not supported.");
}

TPageHinkley::TPageHinkley(const TWPt<TBase>& Base, const PJsonVal& ParamVal) : TStreamAggr(Base, ParamVal) {
Reset();
// only set drift offset to negative value in the beginning
// DriftOffset = 0 is set, when the first drift has been detected
// after that the offset is incremented in each step so that we can
// monitor, when the concept drift appeared
DriftOffset = -1;
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
// parse out input aggregate
InAggrVal = Cast<TStreamAggrOut::IFlt>(ParseAggr(ParamVal, "inAggr"));
Reset();
// only set drift offset to negative value in the beginning
// DriftOffset = 0 is set, when the first drift has been detected
// after that the offset is incremented in each step so that we can
// monitor, when the concept drift appeared
DriftOffset = -1;
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
// parse out input aggregate
InAggrVal = Cast<TStreamAggrOut::IFlt>(ParseAggr(ParamVal, "inAggr"));
}

PJsonVal TPageHinkley::GetParams() const {
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
return Obj;
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
return Obj;
}

void TPageHinkley::SetParams(const PJsonVal& ParamVal) {
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
}

void TPageHinkley::LoadState(TSIn& SIn) {
MinInstances.Load(SIn);
Delta.Load(SIn);
Lambda.Load(SIn);
Alpha.Load(SIn);
XMean.Load(SIn);
SampleCount.Load(SIn);
Sum.Load(SIn);
Drift.Load(SIn);
DriftOffset.Load(SIn);
}

void TPageHinkley::SaveState(TSOut& SOut) const {
MinInstances.Save(SOut);
Delta.Save(SOut);
Lambda.Save(SOut);
Alpha.Save(SOut);
XMean.Save(SOut);
SampleCount.Save(SOut);
Sum.Save(SOut);
Drift.Save(SOut);
DriftOffset.Save(SOut);
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
}

void TPageHinkley::LoadState(TSIn& SIn) {
XMean.Load(SIn);
SampleCount.Load(SIn);
Sum.Load(SIn);
Drift.Load(SIn);
DriftOffset.Load(SIn);
}

void TPageHinkley::SaveState(TSOut& SOut) const {
XMean.Save(SOut);
SampleCount.Save(SOut);
Sum.Save(SOut);
Drift.Save(SOut);
DriftOffset.Save(SOut);
}

void TPageHinkley::Reset() {
MinInstances = 30;
Delta = 0.005;
Lambda = 50.0;
Alpha = 0.9999;
XMean = 0.0;
SampleCount = 1;
Sum = 0.0;
Drift = 0;
DriftOffset = 0;
Delta = 0.005;
Lambda = 50.0;
Alpha = 0.9999;
XMean = 0.0;
SampleCount = 1;
Sum = 0.0;
Drift = 0;
DriftOffset = 0;
}

int TPageHinkley::GetInt() const {
return GetNmInt("drift");
return GetNmInt("drift");
}

int TPageHinkley::GetNmInt(const TStr& Nm) const {
if (Nm == "drift") {
return Drift;
}
else if (Nm == "driftOffset") {
return DriftOffset;
}
else {
throw TExcept::New("TPageHinkley::GetNmInt unknown key: " + Nm);
}
if (Nm == "drift") {
return Drift;
}
else if (Nm == "driftOffset") {
return DriftOffset;
}
else {
throw TExcept::New("TPageHinkley::GetNmInt unknown key: " + Nm);
}
}

PJsonVal TPageHinkley::SaveJson(const int& Limit) const {
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
Obj->AddToObj("drift", Drift);
Obj->AddToObj("driftOffset", DriftOffset);
return Obj;
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
Obj->AddToObj("drift", Drift);
Obj->AddToObj("driftOffset", DriftOffset);
return Obj;
}

} // TStreamAggrs namespace
Expand Down
114 changes: 57 additions & 57 deletions src/qminer/qminer_aggr.h
Original file line number Diff line number Diff line change
Expand Up @@ -2206,68 +2206,68 @@ class THistogramAD : public TStreamAggr, public TStreamAggrOut::INmInt, public T
/// moment. Page Hinkley won't output warning zone warnings, only change detections. The method works by
/// means of the Page Hinkley test.In general lines it will detect a concept drift if the observed mean
/// at some instant is greater then a threshold value lambda.
class TPageHinkley : public TStreamAggr,
public TStreamAggrOut::INmInt,
public TStreamAggrOut::IInt {
class TPageHinkley : public TStreamAggr,
public TStreamAggrOut::INmInt,
public TStreamAggrOut::IInt {
private:
/// Input for prediction
TWPt<TStreamAggrOut::IFlt> InAggrVal;
/// Minimum instances to trigger concept drift
TInt MinInstances;
/// The delta factor for Page-Hinkley test
TFlt Delta;
/// The change detection threshold
TFlt Lambda;
/// The forgetting factor, used to weight the observed value and the mean
TFlt Alpha;
/// Mean of the input values
TFlt XMean;
/// Count of the samples
TInt SampleCount;
/// Sum for calculating standard deviation
TFlt Sum;
/// Drift was detected
TInt Drift;
/// How long ago drift was detected
TInt DriftOffset;
/// Input for prediction
TWPt<TStreamAggrOut::IFlt> InAggrVal;
/// Minimum instances to trigger concept drift
TInt MinInstances;
/// The delta factor for Page-Hinkley test
TFlt Delta;
/// The change detection threshold
TFlt Lambda;
/// The forgetting factor, used to weight the observed value and the mean
TFlt Alpha;
/// Mean of the input values
TFlt XMean;
/// Count of the samples
TInt SampleCount;
/// Sum for calculating standard deviation
TFlt Sum;
/// Drift was detected
TInt Drift;
/// How long ago drift was detected
TInt DriftOffset;

protected:
/// Placeholder just throwing exception.
void OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr);
/// Placeholder just throwing exception.
void OnStep(const TWPt<TStreamAggr>& CallerAggr);
/// JSON based constructor.
TPageHinkley(const TWPt<TBase>& Base, const PJsonVal& ParamVal);
/// Placeholder just throwing exception.
void OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr);
/// Placeholder just throwing exception.
void OnStep(const TWPt<TStreamAggr>& CallerAggr);
/// JSON based constructor.
TPageHinkley(const TWPt<TBase>& Base, const PJsonVal& ParamVal);
public:
/// Smart pointer constructor
static PStreamAggr New(const TWPt<TBase>& Base, const PJsonVal& ParamVal) { return new TPageHinkley(Base, ParamVal); }

/// Loads state
void LoadState(TSIn& SIn);
/// Saves state
void SaveState(TSOut& SOut) const;
/// Smart pointer constructor
static PStreamAggr New(const TWPt<TBase>& Base, const PJsonVal& ParamVal) { return new TPageHinkley(Base, ParamVal); }

/// Returns the parameters
PJsonVal GetParams() const;
/// Sets the parameters - used for updating the map (extend or replace)
void SetParams(const PJsonVal& ParamVal);

/// Is the aggregate initialized?
bool IsInit() const { return SampleCount > MinInstances; }
/// Returns true if the string is supported
bool IsNmInt(const TStr& Nm) const { return (Nm == "driftOffset") || (Nm == "drift"); }
/// Returns current drift status
int GetInt() const;
/// Returns the current drift status and last drift offset
int GetNmInt(const TStr& Nm) const;
/// Resets the aggregate
void Reset();
/// JSON serialization
PJsonVal SaveJson(const int& Limit) const;
/// Stream aggregator type name
static TStr GetType() { return "pagehinkley"; }
/// Stream aggregator type name
TStr Type() const { return GetType(); }
/// Loads state
void LoadState(TSIn& SIn);
/// Saves state
void SaveState(TSOut& SOut) const;

/// Returns the parameters
PJsonVal GetParams() const;
/// Sets the parameters - used for updating the map (extend or replace)
void SetParams(const PJsonVal& ParamVal);

/// Is the aggregate initialized?
bool IsInit() const { return SampleCount > MinInstances; }
/// Returns true if the string is supported
bool IsNmInt(const TStr& Nm) const { return (Nm == "driftOffset") || (Nm == "drift"); }
/// Returns current drift status
int GetInt() const;
/// Returns the current drift status and last drift offset
int GetNmInt(const TStr& Nm) const;
/// Resets the aggregate
void Reset();
/// JSON serialization
PJsonVal SaveJson(const int& Limit) const;
/// Stream aggregator type name
static TStr GetType() { return "pagehinkley"; }
/// Stream aggregator type name
TStr Type() const { return GetType(); }
};

///////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion src/qminer/qminer_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6612,7 +6612,7 @@ void TStreamAggr::Init() {
Register<TStreamAggrs::TWinBufSpVecSum>();
Register<TStreamAggrs::TRecSwitchAggr>();
Register<TStreamAggrs::THistogramAD>();
Register<TStreamAggrs::TPageHinkley>();
Register<TStreamAggrs::TPageHinkley>();
Register<TStreamAggrs::TSwGk>();
}

Expand Down
4 changes: 1 addition & 3 deletions test/nodejs/streamaggr2.js
Original file line number Diff line number Diff line change
Expand Up @@ -1374,7 +1374,6 @@ describe("test concept drift methods", function() {
it ('should not detect concept drift', function() {
// creating start time
let time = new Date();
let timeStr = time.toISOString();
let changes = 0;
time.setSeconds(time.getSeconds() - 1000);

Expand All @@ -1391,7 +1390,7 @@ describe("test concept drift methods", function() {
changes++;
}
}
// checking if drift has been correctly detected
// checking that drift has not been detected
assert.equal(changes, 0);
driftOffset = pht.saveJson().driftOffset;
assert.equal(driftOffset, -1);
Expand All @@ -1400,7 +1399,6 @@ describe("test concept drift methods", function() {
it('should detect concept drift in proximity of simulated change', function() {
// creating start time
let time = new Date();
let timeStr = time.toISOString();
let changes = 0;

// simulating concept drift at element 1000 in a time series
Expand Down

0 comments on commit 65f0113

Please sign in to comment.