Skip to content

Commit

Permalink
Page-Hinkley test stream aggregate implemented.
Browse files Browse the repository at this point in the history
  • Loading branch information
klemenkenda committed Oct 23, 2018
1 parent 01ccd8d commit b4ac47f
Show file tree
Hide file tree
Showing 5 changed files with 504 additions and 32 deletions.
95 changes: 95 additions & 0 deletions src/nodejs/qm/qm_nodejs_streamaggr.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
* @property {module:qm~StreamAggrThreshold} treshold - The threshold indicator type.
* @property {module:qm~StreamAggrTDigest} tdigest - The quantile estimator type. It estimates the quantiles of the given data using {@link module:analytics.TDigest TDigest}.
* @property {module:qm~StreamAggrRecordSwitch} record-switch-aggr - The record switch type.
* @property {module:qm~StreamAggrPageHinkley} pagehinkley - The Page-Hinkley test for concept drift detection type.
*/

/**
Expand Down Expand Up @@ -1817,6 +1818,100 @@
*
*/

/**
* @typedef {module:qm.StreamAggr} StreamAggrPageHinkley
* This stream aggregate enables detecting concept drift based on Page-Hinkley test
* on a stream of numeric data. It is based on the book
* Gamma, Knowledge Discovery from Data Streams, 2013, pp. 76
* <br>1. {@link module:qm.StreamAggr#getInteger} takes a string input (either `'drift'` or `'driftOffset'`)
* and returns returns the value of the property or `null` if it's unknown. `'drift'` is set to 1 if the drift
* has been detected in this step, otherise to 0. `'drift`' is always set to 0 in the next step. `'driftOffset'`
* monitors the offset (number of onStep calls) since last concept drift was detected.
* <br>2. {@link module:qm.StreamAggr#getParams} returns a parameter object.
* <br>3. {@link module:qm.StreamAggr#setParams} used for changing Page-Hinkley test parameters
*
* @property {string} [name] - The given name of the stream aggregator (autogenerated by default).
* @property {string} type - The type for the stream aggregator. <b>Important:</b> It must be equal to `'pagehinkley'`.
* @property {string} store - The name of the store consistent with the records that it processes.
* @property {string} inAggr - The name of the stream aggregate used for input (i.e. {@link module:qm.StreamAggrTimeSeriesTick}).
* @property {number} maxInstances - Minimal number of instances needed for initialization of the aggregator (when can first concept drift be initialized?).
* @property {number} delta - The delta factor for the Page Hinkley test.
* @property {number} lambda - The change detection threshold.
* @property {number} alpha - The forgetting factor, used to weight the observed value and the mean.
* @example
* // main library
* let qm = require('qminer');
*
* // create a base with a simple store
* // the store records results of clustering
* base = new qm.Base({
* mode: "createClean",
* schema: [{
* name: "Store",
* fields: [
* { name: "Value", type: "float" },
* { name: "Time", type: "datetime" }
* ]
* }]
* });
*
* // create a new time series stream aggregator for the 'Store' store that takes the recorded cluster id
* // and the timestamp from the 'Time' field. The size of the window is 2 hours.
* let timeser = {
* name: 'seriesTick1',
* type: 'timeSeriesTick',
* store: 'Store',
* timestamp: 'Time',
* value: 'Value'
* };
* let timeSeries1 = base.store("Store").addStreamAggr(timeser);
* // add a histogram aggregator, that is connected with the 'TimeSeries1' aggregator
* let aggrPHT = {
* name: 'PageHinkley',
* type: 'pagehinkley',
* store: 'Store',
* inAggr: 'seriesTick1',
* minInstances: 1,
* delta: 0.005,
* lambda: 50,
* alpha: 0.9999
* };
* store = base.store("Store");
* pht = store.addStreamAggr(aggrPHT);
*
* // creating start time
* let time = new Date();
* let timeStr = time.toISOString();
* let changes = 0;
*
* // simulating concept drift at element 1000 in a time series
* for (let i = 0; i < 2000; i++) {
* // add one second to the timestamp and create an ISO string
* time.setSeconds(time.getSeconds() + 1);
* let timeStr = time.toISOString();
* // create value
* let value = Math.random() * 1;
* if (i > 1000) {
* value = Math.random() * 2 + 1;
* }
* // adding values to the signal store
* store.push({ Time: timeStr, Value: value });
* // counting changes
* if (pht.saveJson().drift == 1) {
* changes++;
* }
* }
*
* // checking if drift has been correctly detected
* if (changes >= 1) console.log("Last concept drift was detected " + pht.val.driftOffset + " samples ago.");
* else console.log("No concept drift was detected!");
*
* // clean up
* base.close();
*
*/

class TNodeJsStreamAggr : public node::ObjectWrap {
friend class TNodeJsUtil;
private:
Expand Down
121 changes: 121 additions & 0 deletions src/qminer/qminer_aggr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2781,5 +2781,126 @@ PJsonVal THistogramAD::SaveJson(const int& Limit) const {
return Obj;
}


//////////////////////////////////////////////////////////////////////////////////
/// Page-Hinkley test for concept drift detection aggregate

void TPageHinkley::OnStep(const TWPt<TStreamAggr>& CallerAggr) {
TScopeStopWatch StopWatch(ExeTm);

if (Drift > 0) Reset();
TFlt X = InAggrVal->GetFlt();
XMean = XMean + (X - XMean) / (1.0 * SampleCount);
Sum = Alpha * Sum + (X - XMean - Delta);

SampleCount++;

/// Only update drift offset, if it has already occured
if (DriftOffset >= 0) DriftOffset++;

if ((SampleCount > MinInstances) && (Sum > Lambda)) {
Drift = 1;
DriftOffset = 0;
}
}

void TPageHinkley::OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr) {
throw TExcept::New("TPageHinkley::OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr) not supported.");
}

TPageHinkley::TPageHinkley(const TWPt<TBase>& Base, const PJsonVal& ParamVal) : TStreamAggr(Base, ParamVal) {
Reset();
// only set drift offset to negative value in the beginning
// DriftOffset = 0 is set, when the first drift has been detected
// after that the offset is incremented in each step so that we can
// monitor, when the concept drift appeared
DriftOffset = -1;
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
// parse out input aggregate
InAggrVal = Cast<TStreamAggrOut::IFlt>(ParseAggr(ParamVal, "inAggr"));
}

PJsonVal TPageHinkley::GetParams() const {
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
return Obj;
}

void TPageHinkley::SetParams(const PJsonVal& ParamVal) {
MinInstances = (int)ParamVal->GetObjNum("minInstances", 30);
Delta = (double)ParamVal->GetObjNum("delta", 0.005);
Lambda = (double)ParamVal->GetObjNum("lambda", 50.0);
Alpha = (double)ParamVal->GetObjNum("alpha", 0.9999);
}

void TPageHinkley::LoadState(TSIn& SIn) {
MinInstances.Load(SIn);
Delta.Load(SIn);
Lambda.Load(SIn);
Alpha.Load(SIn);
XMean.Load(SIn);
SampleCount.Load(SIn);
Sum.Load(SIn);
Drift.Load(SIn);
DriftOffset.Load(SIn);
}

void TPageHinkley::SaveState(TSOut& SOut) const {
MinInstances.Save(SOut);
Delta.Save(SOut);
Lambda.Save(SOut);
Alpha.Save(SOut);
XMean.Save(SOut);
SampleCount.Save(SOut);
Sum.Save(SOut);
Drift.Save(SOut);
DriftOffset.Save(SOut);
}

void TPageHinkley::Reset() {
MinInstances = 30;
Delta = 0.005;
Lambda = 50.0;
Alpha = 0.9999;
XMean = 0.0;
SampleCount = 1;
Sum = 0.0;
Drift = 0;
DriftOffset = 0;
}

int TPageHinkley::GetInt() const {
return GetNmInt("drift");
}

int TPageHinkley::GetNmInt(const TStr& Nm) const {
if (Nm == "drift") {
return Drift;
}
else if (Nm == "driftOffset") {
return DriftOffset;
}
else {
throw TExcept::New("TPageHinkley::GetNmInt unknown key: " + Nm);
}
}

PJsonVal TPageHinkley::SaveJson(const int& Limit) const {
PJsonVal Obj = TJsonVal::NewObj();
Obj->AddToObj("minInstances", MinInstances);
Obj->AddToObj("delta", Delta);
Obj->AddToObj("lambda", Lambda);
Obj->AddToObj("alpha", Alpha);
Obj->AddToObj("drift", Drift);
Obj->AddToObj("driftOffset", DriftOffset);
return Obj;
}

} // TStreamAggrs namespace
} // TQm namespace
71 changes: 71 additions & 0 deletions src/qminer/qminer_aggr.h
Original file line number Diff line number Diff line change
Expand Up @@ -2199,6 +2199,77 @@ class THistogramAD : public TStreamAggr, public TStreamAggrOut::INmInt, public T
TStr Type() const { return GetType(); }
};

//////////////////////////////////////////////////////////////////////////////////
/// Page-Hinkley test for concept drift detection aggregate
///
/// This change detection method works by computing the observed values and their mean up to the current
/// moment. Page Hinkley won't output warning zone warnings, only change detections. The method works by
/// means of the Page Hinkley test.In general lines it will detect a concept drift if the observed mean
/// at some instant is greater then a threshold value lambda.
class TPageHinkley : public TStreamAggr,
public TStreamAggrOut::INmInt,
public TStreamAggrOut::IInt {
private:
/// Input for prediction
TWPt<TStreamAggrOut::IFlt> InAggrVal;
/// Minimum instances to trigger concept drift
TInt MinInstances;
/// The delta factor for Page-Hinkley test
TFlt Delta;
/// The change detection threshold
TFlt Lambda;
/// The forgetting factor, used to weight the observed value and the mean
TFlt Alpha;
/// Mean of the input values
TFlt XMean;
/// Count of the samples
TInt SampleCount;
/// Sum for calculating standard deviation
TFlt Sum;
/// Drift was detected
TInt Drift;
/// How long ago drift was detected
TInt DriftOffset;

protected:
/// Placeholder just throwing exception.
void OnTime(const uint64& TmMsec, const TWPt<TStreamAggr>& CallerAggr);
/// Placeholder just throwing exception.
void OnStep(const TWPt<TStreamAggr>& CallerAggr);
/// JSON based constructor.
TPageHinkley(const TWPt<TBase>& Base, const PJsonVal& ParamVal);
public:
/// Smart pointer constructor
static PStreamAggr New(const TWPt<TBase>& Base, const PJsonVal& ParamVal) { return new TPageHinkley(Base, ParamVal); }

/// Loads state
void LoadState(TSIn& SIn);
/// Saves state
void SaveState(TSOut& SOut) const;

/// Returns the parameters
PJsonVal GetParams() const;
/// Sets the parameters - used for updating the map (extend or replace)
void SetParams(const PJsonVal& ParamVal);

/// Is the aggregate initialized?
bool IsInit() const { return SampleCount > MinInstances; }
/// Returns true if the string is supported
bool IsNmInt(const TStr& Nm) const { return (Nm == "driftOffset") || (Nm == "drift"); }
/// Returns current drift status
int GetInt() const;
/// Returns the current drift status and last drift offset
int GetNmInt(const TStr& Nm) const;
/// Resets the aggregate
void Reset();
/// JSON serialization
PJsonVal SaveJson(const int& Limit) const;
/// Stream aggregator type name
static TStr GetType() { return "pagehinkley"; }
/// Stream aggregator type name
TStr Type() const { return GetType(); }
};

///////////////////////////////
/// Template class implementation
#include "qminer_aggr.hpp"
Expand Down
1 change: 1 addition & 0 deletions src/qminer/qminer_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6612,6 +6612,7 @@ void TStreamAggr::Init() {
Register<TStreamAggrs::TWinBufSpVecSum>();
Register<TStreamAggrs::TRecSwitchAggr>();
Register<TStreamAggrs::THistogramAD>();
Register<TStreamAggrs::TPageHinkley>();
Register<TStreamAggrs::TSwGk>();
}

Expand Down
Loading

0 comments on commit b4ac47f

Please sign in to comment.