Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

anomaly localization integration step 3 #114

Merged
merged 3 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,89 @@

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.ToString;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* Output of localized results.
*/
@Data
public class Output implements ToXContent {
@NoArgsConstructor
public class Output implements org.opensearch.ml.common.parameter.Output {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about rename the class name to avoid full class path org.opensearch.ml.common.parameter.Output as they have same class name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public static final String FIELD_RESULTS = "results";
public static final String FIELD_NAME = "name";
public static final String FIELD_RESULT = "result";

private Map<String, Result> results = new HashMap<>(); // aggregation name to result.

public Output(StreamInput in) throws IOException {
this.results = in.readMap(StreamInput::readString, Result::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(results, StreamOutput::writeString, (o, r) -> r.writeTo(o));
}

/**
* Localized entity.
*/
@Data
public static class Entity implements ToXContent {
@NoArgsConstructor
public static class Entity implements org.opensearch.ml.common.parameter.Output {

public static final String FIELD_KEY = "key";
public static final String FIELD_CONTRIBUTION_VALUE = "contribution_value";
public static final String FIELD_BASE_VALUE = "base_value";
public static final String FIELD_NEW_VALUE = "new_value";

private List<String> key; // key of the entity
private double contributionValue; // computed contribution of the entity
private double baseValue; // base value of the entity
private double newValue; // new value of the entity

public Entity(StreamInput in) throws IOException {
this.key = in.readList(StreamInput::readString);
this.contributionValue = in.readDouble();
this.baseValue = in.readDouble();
this.newValue = in.readDouble();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringCollection(key);
out.writeDouble(contributionValue);
out.writeDouble(baseValue);
out.writeDouble(newValue);
}

@SneakyThrows
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) {
builder.startObject();
builder.field("key", this.key);
builder.field("contribution", this.contributionValue);
builder.field("baseValue", this.baseValue);
builder.field("newValue", this.newValue);
builder.field(FIELD_KEY, this.key);
builder.field(FIELD_CONTRIBUTION_VALUE, this.contributionValue);
builder.field(FIELD_BASE_VALUE, this.baseValue);
builder.field(FIELD_NEW_VALUE, this.newValue);
builder.endObject();
return builder;
}
Expand All @@ -62,9 +104,15 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
* Localized entities are bucketized by time.
*/
@Data
@NoArgsConstructor
@ToString(exclude = {"base", "counter", "completed"})
@EqualsAndHashCode(exclude = {"base", "counter", "completed"})
public static class Bucket implements ToXContent {
public static class Bucket implements org.opensearch.ml.common.parameter.Output {

public static final String FIELD_START_TIME = "start_time";
public static final String FIELD_END_TIME = "end_time";
public static final String FIELD_OVERALL_VALUE = "overall_aggregate_value";
public static final String FIELD_ENTITIES = "entities";

private long startTime; // start time of the bucket
private long endTime; // end time of the bucket
Expand All @@ -75,14 +123,29 @@ public static class Bucket implements ToXContent {
private Optional<Counter> counter = Optional.empty();
private AtomicBoolean completed = null;

public Bucket(StreamInput in) throws IOException {
this.startTime = in.readLong();
this.endTime = in.readLong();
this.overallAggValue = in.readDouble();
this.entities = in.readList(Entity::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(startTime);
out.writeLong(endTime);
out.writeDouble(overallAggValue);
out.writeList(entities);
}

@SneakyThrows
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) {
builder.startObject();
builder.field("start", this.startTime);
builder.field("end", this.endTime);
builder.field("aggregateValue", this.overallAggValue);
builder.field(FIELD_START_TIME, this.startTime);
builder.field(FIELD_END_TIME, this.endTime);
builder.field(FIELD_OVERALL_VALUE, this.overallAggValue);
if (this.entities != null && !this.entities.isEmpty()) {
builder.field("entities", this.entities);
builder.field(FIELD_ENTITIES, this.entities);
}
builder.endObject();
return builder;
Expand All @@ -93,14 +156,26 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
* Localized result.
*/
@Data
public static class Result implements ToXContent {
@NoArgsConstructor
public static class Result implements org.opensearch.ml.common.parameter.Output {

public static final String FIELD_BUCKETS = "buckets";

private List<Bucket> buckets = new ArrayList<>(); // localized results are bucketized by time

public Result(StreamInput in) throws IOException {
this.buckets = in.readList(Bucket::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(this.buckets);
}

@SneakyThrows
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) {
builder.startObject();
builder.field("buckets", this.buckets.toArray());
builder.field(FIELD_BUCKETS, this.buckets.toArray());
builder.endObject();
return builder;
}
Expand All @@ -110,15 +185,130 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
@SneakyThrows
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) {
builder.startObject();
builder.startArray("results");
builder.startArray(FIELD_RESULTS);
for (Map.Entry<String, Result> entry : this.results.entrySet()) {
builder.startObject();
builder.field("name", entry.getKey());
builder.field("result", entry.getValue());
builder.field(FIELD_NAME, entry.getKey());
builder.field(FIELD_RESULT, entry.getValue());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
}

public static Output parse(XContentParser parser) throws IOException {
Output output = new Output();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case FIELD_RESULTS:
parseResultMapEntry(parser, output);
break;
}
}
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
return output;
}

private static void parseResultMapEntry(XContentParser parser, Output output) throws IOException {
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
String key = null;
Output.Result result = new Output.Result();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case FIELD_NAME:
parser.nextToken();
key = parser.text();
break;
case FIELD_RESULT:
parseResult(parser, result);
break;
}
}
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
output.getResults().put(key, result);
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
}

private static void parseResult(XContentParser parser, Output.Result result) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case Output.Result.FIELD_BUCKETS:
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
Output.Bucket bucket = new Output.Bucket();
parseBucket(parser, bucket);
result.getBuckets().add(bucket);
}
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
break;
}
}
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
}

private static void parseBucket(XContentParser parser, Output.Bucket bucket) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case Output.Bucket.FIELD_START_TIME:
parser.nextToken();
bucket.setStartTime(parser.longValue());
break;
case Output.Bucket.FIELD_END_TIME:
parser.nextToken();
bucket.setEndTime(parser.longValue());
break;
case Output.Bucket.FIELD_OVERALL_VALUE:
parser.nextToken();
bucket.setOverallAggValue(parser.doubleValue());
break;
case Output.Bucket.FIELD_ENTITIES:
parseEntities(parser, bucket);
break;
}
}
ensureExpectedToken(XContentParser.Token.END_OBJECT, parser.currentToken(), parser);
}

private static void parseEntities(XContentParser parser, Output.Bucket bucket) throws IOException {
List<Output.Entity> entities = new ArrayList<>();
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser);
Output.Entity entity = new Output.Entity();
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
switch (parser.currentName()) {
case Output.Entity.FIELD_KEY:
List<String> key = new ArrayList<>();
ensureExpectedToken(XContentParser.Token.START_ARRAY, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
key.add(parser.text());
}
entity.setKey(key);
break;
case Output.Entity.FIELD_CONTRIBUTION_VALUE:
parser.nextToken();
entity.setContributionValue(parser.doubleValue());
break;
case Output.Entity.FIELD_BASE_VALUE:
parser.nextToken();
entity.setBaseValue(parser.doubleValue());
break;
case Output.Entity.FIELD_NEW_VALUE:
parser.nextToken();
entity.setNewValue(parser.doubleValue());
break;
Copy link
Collaborator

@ylwu-amzn ylwu-amzn Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skip children for default case? Same question for other places

                default:
                    parser.skipChildren();
                    break;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
entities.add(entity);
}
bucket.setEntities(entities);
ensureExpectedToken(XContentParser.Token.END_ARRAY, parser.currentToken(), parser);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*
*/

package org.opensearch.ml.engine.algorithms.anomalylocalization;

import java.util.Arrays;

import org.junit.Before;
import org.junit.Test;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;

import static org.junit.Assert.assertEquals;

public class OutputTests {

private Output output;

@Before
public void setup() {
Output.Entity entity = new Output.Entity();
entity.setKey(Arrays.asList("key1"));
Output.Bucket bucket = new Output.Bucket();
bucket.setEntities(Arrays.asList(entity));
Output.Result result = new Output.Result();
result.setBuckets(Arrays.asList(bucket));
output = new Output();
output.getResults().put("agg", result);
}

@Test
public void testWriteable() throws Exception {
BytesStreamOutput out = new BytesStreamOutput();

output.writeTo(out);
Output newOutput = new Output(out.bytes().streamInput());

assertEquals(output, newOutput);
}

@Test
public void testXContent() throws Exception {
XContentBuilder builder = XContentFactory.jsonBuilder();

builder = output.toXContent(builder, null);

String json = Strings.toString(builder);
XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, null, json);
Output newOutput = Output.parse(parser);

assertEquals(output, newOutput);
}
}