Skip to content

[ML] Improve uniqueness of result document IDs #50644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,18 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.hash.MurmurHash3;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

public final class MachineLearningField {
public static final Setting<Boolean> AUTODETECT_PROCESS =
Setting.boolSetting("xpack.ml.autodetect_process", true, Setting.Property.NodeScope);
Expand All @@ -19,4 +27,13 @@ public final class MachineLearningField {

private MachineLearningField() {}

public static String valuesToId(String... values) {
String combined = Arrays.stream(values).filter(Objects::nonNull).collect(Collectors.joining());
byte[] bytes = combined.getBytes(StandardCharsets.UTF_8);
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());
byte[] hashedBytes = new byte[16];
System.arraycopy(Numbers.longToBytes(hash.h1), 0, hashedBytes, 0, 8);
System.arraycopy(Numbers.longToBytes(hash.h2), 0, hashedBytes, 8, 8);
return new BigInteger(hashedBytes) + "_" + combined.length();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand Down Expand Up @@ -353,12 +354,13 @@ public String getJobId() {
* Data store ID of this record.
*/
public String getId() {
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
(overFieldValue == null ? 0 : overFieldValue.length()) +
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
return buildId(jobId, timestamp, bucketSpan, detectorIndex, byFieldValue, overFieldValue, partitionFieldValue);
}

return jobId + "_record_" + timestamp.getTime() + "_" + bucketSpan + "_" + detectorIndex + "_" + valuesHash + "_" + length;
static String buildId(String jobId, Date timestamp, long bucketSpan, int detectorIndex,
String byFieldValue, String overFieldValue, String partitionFieldValue) {
return jobId + "_record_" + timestamp.getTime() + "_" + bucketSpan + "_" + detectorIndex + "_"
+ MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
}

public int getDetectorIndex() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.common.time.TimeUtils;

Expand Down Expand Up @@ -165,12 +166,9 @@ public String getForecastId() {
}

public String getId() {
int valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
return jobId + "_model_forecast_" + forecastId + "_" + timestamp.getTime()
+ "_" + bucketSpan + "_" + detectorIndex + "_"
+ valuesHash + "_" + length;
+ MachineLearningField.valuesToId(byFieldValue, partitionFieldValue);
}

public Date getTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.common.time.TimeUtils;
Expand Down Expand Up @@ -134,7 +135,7 @@ public String getJobId() {

public String getId() {
return jobId + "_influencer_" + timestamp.getTime() + "_" + bucketSpan + "_" +
influenceField + "_" + influenceValue.hashCode() + "_" + influenceValue.length();
influenceField + "_" + MachineLearningField.valuesToId(influenceValue);
}

public double getProbability() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.common.time.TimeUtils;

Expand Down Expand Up @@ -183,12 +184,8 @@ public String getJobId() {
}

public String getId() {
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
(overFieldValue == null ? 0 : overFieldValue.length()) +
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
return jobId + "_model_plot_" + timestamp.getTime() + "_" + bucketSpan
+ "_" + detectorIndex + "_" + valuesHash + "_" + length;
+ "_" + detectorIndex + "_" + MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
}

public Date getTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class AnomalyRecordTests extends AbstractSerializingTestCase<AnomalyRecord> {

Expand Down Expand Up @@ -174,28 +177,23 @@ public void testId() {
String overFieldValue = null;
String partitionFieldValue = null;

int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("test-job_record_1000_60_0_" + valuesHash + "_0", record.getId());
assertEquals("test-job_record_1000_60_0_0_0", record.getId());

int length = 0;
if (randomBoolean()) {
byFieldValue = randomAlphaOfLength(10);
length += byFieldValue.length();
record.setByFieldValue(byFieldValue);
}
if (randomBoolean()) {
overFieldValue = randomAlphaOfLength(10);
length += overFieldValue.length();
record.setOverFieldValue(overFieldValue);
}
if (randomBoolean()) {
partitionFieldValue = randomAlphaOfLength(10);
length += partitionFieldValue.length();
record.setPartitionFieldValue(partitionFieldValue);
}

valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("test-job_record_1000_60_0_" + valuesHash + "_" + length, record.getId());
String valuesPart = MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("test-job_record_1000_60_0_" + valuesPart, record.getId());
}

public void testStrictParser_IsLenientOnTopLevelFields() throws IOException {
Expand All @@ -222,4 +220,18 @@ public void testLenientParser() throws IOException {
AnomalyRecord.LENIENT_PARSER.apply(parser, null);
}
}

public void testIdLength() {
String jobId = randomAlphaOfLength(MlStrings.ID_LENGTH_LIMIT);
Date timestamp = new Date(Long.MAX_VALUE);
long bucketSpan = Long.MAX_VALUE;
int detectorIndex = Integer.MAX_VALUE;
String byFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));
String overFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));
String partitionFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));

String id = AnomalyRecord.buildId(jobId, timestamp, bucketSpan, detectorIndex, byFieldValue, overFieldValue, partitionFieldValue);
// 512 comes from IndexRequest.validate()
assertThat(id.getBytes(StandardCharsets.UTF_8).length, lessThanOrEqualTo(512));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.MachineLearningField;

import java.io.IOException;
import java.util.Date;
import java.util.Objects;

public class InfluencerTests extends AbstractSerializingTestCase<Influencer> {

Expand Down Expand Up @@ -64,8 +64,8 @@ public void testToXContentDoesNotIncludeNameValueFieldWhenReservedWord() throws
public void testId() {
String influencerFieldValue = "wopr";
Influencer influencer = new Influencer("job-foo", "host", influencerFieldValue, new Date(1000), 300L);
int valueHash = Objects.hashCode(influencerFieldValue);
assertEquals("job-foo_influencer_1000_300_host_" + valueHash + "_" + influencerFieldValue.length(), influencer.getId());
String valuePart = MachineLearningField.valuesToId(influencerFieldValue);
assertEquals("job-foo_influencer_1000_300_host_" + valuePart, influencer.getId());
}

public void testLenientParser() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.results.Forecast;

import java.io.IOException;
import java.util.Date;
import java.util.Objects;

import static org.hamcrest.Matchers.containsString;

Expand Down Expand Up @@ -72,23 +72,19 @@ public void testId() {
String byFieldValue = null;
String partitionFieldValue = null;

int valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesHash + "_0", forecast.getId());
assertEquals("job-foo_model_forecast_222_100_60_2_0_0", forecast.getId());

int length = 0;
if (randomBoolean()) {
byFieldValue = randomAlphaOfLength(10);
length += byFieldValue.length();
forecast.setByFieldValue(byFieldValue);
}
if (randomBoolean()) {
partitionFieldValue = randomAlphaOfLength(10);
length += partitionFieldValue.length();
forecast.setPartitionFieldValue(partitionFieldValue);
}

valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesHash + "_" + length, forecast.getId());
String valuesPart = MachineLearningField.valuesToId(byFieldValue, partitionFieldValue);
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesPart, forecast.getId());
}

public void testStrictParser() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.Objects;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -221,28 +225,23 @@ public void testId() {
String overFieldValue = null;
String partitionFieldValue = null;

int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("job-foo_model_plot_100_60_33_" + valuesHash + "_0", plot.getId());
assertEquals("job-foo_model_plot_100_60_33_0_0", plot.getId());

int length = 0;
if (randomBoolean()) {
byFieldValue = randomAlphaOfLength(10);
length += byFieldValue.length();
plot.setByFieldValue(byFieldValue);
}
if (randomBoolean()) {
overFieldValue = randomAlphaOfLength(10);
length += overFieldValue.length();
plot.setOverFieldValue(overFieldValue);
}
if (randomBoolean()) {
partitionFieldValue = randomAlphaOfLength(10);
length += partitionFieldValue.length();
plot.setPartitionFieldValue(partitionFieldValue);
}

valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("job-foo_model_plot_100_60_33_" + valuesHash + "_" + length, plot.getId());
String valuesPart = MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
assertEquals("job-foo_model_plot_100_60_33_" + valuesPart, plot.getId());
}

public void testStrictParser() throws IOException {
Expand All @@ -262,6 +261,43 @@ public void testLenientParser() throws IOException {
}
}

public void testIdUniqueness() {
ModelPlot modelPlot = new ModelPlot("foo", new Date(), 3600, 0);

String[] partitionFieldValues = { "730", "132", "358", "552", "888", "236", "224", "674",
"438", "128", "722", "560", "228", "628", "226", "656" };
String[] byFieldValues = { "S000", "S001", "S002", "S003", "S004", "S005", "S006", "S007", "S008", "S009",
"S010", "S011", "S012", "S013", "S014", "S015", "S016", "S017", "S018", "S019",
"S020", "S021", "S022", "S023", "S024", "S025", "S026", "S027", "S028", "S029",
"S057", "S058", "S059", "M020", "M021", "M026", "M027", "M028", "M029", "M030",
"M031", "M032", "M033", "M056", "M057", "M058", "M059", "M060", "M061", "M062",
"M063", "M086", "M087", "M088", "M089", "M090", "M091", "M092", "M093", "M116",
"M117", "M118", "M119", "L012", "L013", "L014", "L017", "L018", "L019", "L023",
"L024", "L025", "L029", "L030", "L031" };

Map<String, List<String>> uniqueIds = new HashMap<>();

for (String partitionFieldValue : partitionFieldValues) {
modelPlot.setPartitionFieldValue(partitionFieldValue);
for (String byFieldValue : byFieldValues) {
modelPlot.setByFieldValue(byFieldValue);
String id = modelPlot.getId();
uniqueIds.compute(id, (k, v) -> {
if (v == null) {
v = new ArrayList<>();
}
v.add(partitionFieldValue + "/" + byFieldValue);
if (v.size() > 1) {
logger.error("Duplicates for ID [" + id + "]: " + v);
}
return v;
});
}
}

assertEquals(partitionFieldValues.length * byFieldValues.length, uniqueIds.size());
}

private ModelPlot createFullyPopulated() {
ModelPlot modelPlot = new ModelPlot("foo", new Date(12345678L), 360L, 22);
modelPlot.setByFieldName("by");
Expand Down