Skip to content

Commit

Permalink
add anomaly localization implementation (#103)
Browse files Browse the repository at this point in the history
* add anomaly localization implementation

Signed-off-by: lai <57818076+wnbts@users.noreply.github.com>
  • Loading branch information
wnbts authored Dec 30, 2021
1 parent 30c436e commit 0b36612
Show file tree
Hide file tree
Showing 11 changed files with 1,269 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import org.opensearch.action.ActionListener;

/**
* Localizes aggregate anomalies.
*/
public interface AnomalyLocalizer {

/**
* Gets localized entity results for data specified in the input.
*
* @param input Information about aggregation and metadata.
* @param listener Listener to localized details or exception.
*/
void getLocalizationResults(Input input, ActionListener<Output> listener);
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;

import lombok.extern.log4j.Log4j2;

/**
* CountMin sketch implementation.
*
* @see <a href="https://www.cs.tufts.edu/comp/150FP/archive/graham-cormode/count-min.pdf">Cormode, G., & Muthukrishnan, S. (2005). An improved data stream summary: the count-min sketch and its applications.</a>
*/
@Log4j2
public class CountMinSketch implements Counter {

protected static final double INV_DELTOID = 1 / 0.01;
protected static final double LOG_BASE_2 = 2;
protected static final double INV_EPSILON = 1 / 0.001;

private int numHashes;
private int numBuckets;
private double[][] counts;
private int[] hashes;

/**
* Constructor.
*/
public CountMinSketch() {
this.numHashes = (int) Math.ceil(Math.log(INV_DELTOID) / Math.log(LOG_BASE_2));
this.numBuckets = (int) Math.ceil(INV_EPSILON);
this.counts = new double[this.numHashes][this.numBuckets];
this.hashes = new Random().ints(this.numHashes).toArray();
log.info("count min sketch size " + this.numHashes + " * " + this.numBuckets + " = " + this.numHashes * this.numBuckets);
}

@Override
public void increment(List<String> key, double value) {
int keyHash = key.hashCode();
for (int i = 0; i < this.numHashes; i++) {
counts[i][getBucketIndex(keyHash, i)] += value;
}
}

@Override
public double estimate(List<String> key) {
int keyHash = key.hashCode();
return IntStream.range(0, this.numHashes).mapToDouble(i -> counts[i][getBucketIndex(keyHash, i)]).min().orElse(0.0);
}

private int getBucketIndex(int keyHash, int hashIndex) {
return Math.floorMod(this.hashes[hashIndex] ^ keyHash, this.numBuckets);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;

import lombok.extern.log4j.Log4j2;

/**
* Count sketch implementation.
*
* @see <a href="https://u.cs.biu.ac.il/~porat/2006/MDS/FrequentStream.pdf">Charikar, M., Chen, K., & Farach-Colton, M. (2002, July). Finding frequent items in data streams.</a>
*/
@Log4j2
public class CountSketch implements Counter {

protected static final double INV_DELTOID = 1 / 0.01;
protected static final double LOG_BASE_2 = 2;
protected static final double INV_EPSILON = 1 / 0.001;

private int numHashes;
private int numBuckets;
private double[][] counts;
private int[] hashes;
private int[] signHashes;

/**
* Constructor.
*/
public CountSketch() {
this.numHashes = (int) Math.ceil(Math.log(INV_DELTOID) / Math.log(LOG_BASE_2));
this.numBuckets = (int) Math.ceil(INV_EPSILON);
this.counts = new double[this.numHashes][this.numBuckets];
Random random = new Random();
this.hashes = random.ints(this.numHashes).toArray();
this.signHashes = random.ints(this.numHashes).toArray();
log.info("count sketch size " + this.numHashes + " * " + this.numBuckets + " = " + this.numHashes * this.numBuckets);
}

@Override
public void increment(List<String> key, double value) {
int keyHash = key.hashCode();
for (int i = 0; i < this.numHashes; i++) {
counts[i][getBucketIndex(keyHash, i)] += getCountSign(keyHash, i) * value;
}
}

@Override
public double estimate(List<String> key) {
int keyHash = key.hashCode();
double[] estimates =
IntStream.range(0, this.numHashes).mapToDouble(i -> counts[i][getBucketIndex(keyHash, i)] * getCountSign(keyHash, i)).sorted().toArray();
int numEstimates = estimates.length;
return (estimates[(numEstimates - 1) / 2] + estimates[numEstimates / 2]) / 2;
}

private int getBucketIndex(int keyHash, int hashIndex) {
return Math.floorMod(this.hashes[hashIndex] ^ keyHash, this.numBuckets);
}

private int getCountSign(int keyHash, int hashIndex) {
return Math.floorMod(this.signHashes[hashIndex] ^ keyHash, 2) * 2 - 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.List;

/**
* Counts values for keys.
*/
public interface Counter {

/**
* Increments the value for the key.
*
* @param value The value to increment.
* @param key The key to increment value for.
*/
void increment(List<String> key, double value);

/**
* Gets the value for the key.
*
* @param key The key to get value for.
* @return the (approximate/exact) value for the key
*/
double estimate(List<String> key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.Data;
import lombok.extern.log4j.Log4j2;

/**
* Hashmap-based exact counting.
*/
@Data
@Log4j2
public class HashMapCounter implements Counter {

private Map<List<String>, Double> keyValues = new HashMap<>();

@Override
public void increment(List<String> key, double value) {
keyValues.compute(key, (k, v) -> (v == null) ? value : value + v);
}

@Override
public double estimate(List<String> key) {
return keyValues.getOrDefault(key, 0.0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.List;
import java.util.Map;

import lombok.extern.log4j.Log4j2;

/**
* A hybrid counter that starts with exact counting with map and switches to approximate counting with sketch as the size grows.
*/
@Log4j2
public class HybridCounter implements Counter {

protected static int SKETCH_THRESHOLD = 10_000;

private Counter counter = new HashMapCounter();
private int count = 0;

@Override
public void increment(List<String> key, double value) {
this.counter.increment(key, value);
updateCount();
}

@Override
public double estimate(List<String> key) {
return this.counter.estimate(key);
}

private void updateCount() {
this.count++;
if (this.count == SKETCH_THRESHOLD) {
Map<List<String>, Double> hashmap = ((HashMapCounter) counter).getKeyValues();
boolean hasNegative = hashmap.values().stream().anyMatch(v -> v < 0);
Counter newCounter;
if (hasNegative) { // aggregate value, avg for example, of a key can be negative
newCounter = new CountSketch();
} else {
newCounter = new CountMinSketch();
}
hashmap.forEach((k, v) -> newCounter.increment(k, v));
this.counter = newCounter;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.List;
import java.util.Optional;

import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.aggregations.AggregationBuilder;

import lombok.Data;

/**
* Information about aggregate, time, etc to localize.
*/
@Data
public class Input {

private final String indexName; // name pattern of the data index
private final List<String> attributeFieldNames; // name of the field to localize/slice with
private final List<AggregationBuilder> aggregations; // aggregate data to localize/slice on
private final String timeFieldName; // name of the timestamp field
private final long startTime; // start of entire time range, including normal and anomaly
private final long endTime; // end of entire time range, including normal and anomaly
private final long minTimeInterval; // minimal time interval/bucket
private final int numOutputs; // max number of values from localization/slicing
private final Optional<Long> anomalyStartTime; // time when anomaly change starts
private final Optional<QueryBuilder> filterQuery; // filter of data
}
Loading

0 comments on commit 0b36612

Please sign in to comment.