Skip to content

[ML] Switch to typeless APIs in ML native multi-node tests #39574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Locale;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;

Expand All @@ -35,38 +36,37 @@
public class CategorizationIT extends MlNativeAutodetectIntegTestCase {

private static final String DATA_INDEX = "log-data";
private static final String DATA_TYPE = "log";

private long nowMillis;

@Before
public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(DATA_TYPE, "time", "type=date,format=epoch_millis",
.addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis",
"msg", "type=text")
.get();

nowMillis = System.currentTimeMillis();

BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
IndexRequest indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis(),
"msg", "Node 1 started");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(2).millis() + 1,
"msg", "Failed to shutdown [error org.aaaa.bbbb.Cccc line 54 caused " +
"by foo exception]");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis(),
"msg", "Node 2 started");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis - TimeValue.timeValueHours(1).millis() + 1,
"msg", "Failed to shutdown [error but this time completely different]");
bulkRequestBuilder.add(indexRequest);
indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", nowMillis, "msg", "Node 3 started");
bulkRequestBuilder.add(indexRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testLookbackOnlyWithNestedFields() throws Exception {
client().performRequest(createJobRequest);

String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").build();
new DatafeedBuilder(datafeedId, jobId, "nested-data").build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
Expand Down Expand Up @@ -351,7 +351,7 @@ public void testInsufficientSearchPrivilegesOnPut() throws Exception {
// create a datafeed they DON'T have permission to search the index the datafeed is
// configured to read
ResponseException e = expectThrows(ResponseException.class, () ->
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response")
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs")
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN)
.build());

Expand Down Expand Up @@ -419,7 +419,7 @@ public void testInsufficientSearchPrivilegesOnPutWithRollup() throws Exception {


ResponseException e = expectThrows(ResponseException.class, () ->
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup")
.setAggregations(aggregations)
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS) //want to search, but no admin access
.build());
Expand Down Expand Up @@ -449,7 +449,7 @@ public void testInsufficientSearchPrivilegesOnPreview() throws Exception {
client().performRequest(createJobRequest);

String datafeedId = "datafeed-" + jobId;
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").build();
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs").build();

// This should be disallowed, because ml_admin is trying to preview a datafeed created by
// by another user (x_pack_rest_user in this case) that will reveal the content of an index they
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
Expand Down Expand Up @@ -529,7 +529,7 @@ public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exceptio
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
Expand Down Expand Up @@ -568,7 +568,7 @@ public void testLookbackUsingDerivativeAggWithLargerHistogramBucketThanDataRate(
+ "\"aggs\": {\"timestamp\":{\"max\":{\"field\":\"timestamp\"}},"
+ "\"bytes-delta\":{\"derivative\":{\"buckets_path\":\"avg_bytes_out\"}},"
+ "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}";
new DatafeedBuilder(datafeedId, jobId, "network-data", "doc")
new DatafeedBuilder(datafeedId, jobId, "network-data")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.build();
Expand Down Expand Up @@ -614,7 +614,7 @@ public void testLookbackUsingDerivativeAggWithSmallerHistogramBucketThanDataRate
+ "\"aggs\": {\"timestamp\":{\"max\":{\"field\":\"timestamp\"}},"
+ "\"bytes-delta\":{\"derivative\":{\"buckets_path\":\"avg_bytes_out\"}},"
+ "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}";
new DatafeedBuilder(datafeedId, jobId, "network-data", "doc")
new DatafeedBuilder(datafeedId, jobId, "network-data")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.build();
Expand Down Expand Up @@ -658,7 +658,7 @@ public void testLookbackWithoutPermissions() throws Exception {
+ "\"avg_bytes_out\":{\"avg\":{\"field\":\"network_bytes_out\"}} }}}}}";

// At the time we create the datafeed the user can access the network-data index that we have access to
new DatafeedBuilder(datafeedId, jobId, "network-data", "doc")
new DatafeedBuilder(datafeedId, jobId, "network-data")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
Expand Down Expand Up @@ -712,7 +712,7 @@ public void testLookbackWithPipelineBucketAgg() throws Exception {
+ "\"airlines\":{\"terms\":{\"field\":\"airline.keyword\",\"size\":10}},"
+ "\"percentile95_airlines_count\":{\"percentiles_bucket\":" +
"{\"buckets_path\":\"airlines._count\", \"percents\": [95]}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").setAggregations(aggregations).build();
new DatafeedBuilder(datafeedId, jobId, "airline-data").setAggregations(aggregations).build();

openJob(client(), jobId);

Expand Down Expand Up @@ -801,7 +801,7 @@ public void testLookbackOnlyGivenAggregationsWithHistogramAndRollupIndex() throw
+ "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "response").setAggregations(aggregations).build();
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
Expand Down Expand Up @@ -872,7 +872,7 @@ public void testLookbackWithoutPermissionsAndRollup() throws Exception {


// At the time we create the datafeed the user can access the network-data index that we have access to
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup", "doc")
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs-rollup")
.setAggregations(aggregations)
.setChunkingTimespan("300s")
.setAuthHeader(BASIC_AUTH_VALUE_ML_ADMIN_WITH_SOME_DATA_ACCESS)
Expand Down Expand Up @@ -919,7 +919,7 @@ public void testLookbackWithSingleBucketAgg() throws Exception {
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airlineFilter\":{\"filter\":{\"term\": {\"airline\":\"AAA\"}},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs").setAggregations(aggregations).build();
openJob(client(), jobId);

startDatafeedAndWaitUntilStopped(datafeedId);
Expand All @@ -936,7 +936,7 @@ public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId, "airline");
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
new DatafeedBuilder(datafeedId, jobId, "airline-data").build();
openJob(client(), jobId);

Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
Expand Down Expand Up @@ -994,7 +994,7 @@ public void testForceDeleteWhileDatafeedIsRunning() throws Exception {
String jobId = "job-realtime-2";
createJob(jobId, "airline");
String datafeedId = jobId + "-datafeed";
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
new DatafeedBuilder(datafeedId, jobId, "airline-data").build();
openJob(client(), jobId);

Request startRequest = new Request("POST", MachineLearning.BASE_PATH + "datafeeds/" + datafeedId + "/_start");
Expand Down Expand Up @@ -1059,7 +1059,7 @@ public LookbackOnlyTestHelper setShouldSucceedProcessing(boolean value) {
public void execute() throws Exception {
createJob(jobId, airlineVariant);
String datafeedId = "datafeed-" + jobId;
new DatafeedBuilder(datafeedId, jobId, dataIndex, "response")
new DatafeedBuilder(datafeedId, jobId, dataIndex)
.setScriptedFields(addScriptedFields ?
"{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : null)
.build();
Expand Down Expand Up @@ -1159,18 +1159,16 @@ private static class DatafeedBuilder {
String datafeedId;
String jobId;
String index;
String type;
boolean source;
String scriptedFields;
String aggregations;
String authHeader = BASIC_AUTH_VALUE_SUPER_USER;
String chunkingTimespan;

DatafeedBuilder(String datafeedId, String jobId, String index, String type) {
DatafeedBuilder(String datafeedId, String jobId, String index) {
this.datafeedId = datafeedId;
this.jobId = jobId;
this.index = index;
this.type = type;
}

DatafeedBuilder setSource(boolean enableSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand All @@ -49,12 +50,11 @@
public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {

private static final String DATA_INDEX = "delete-expired-data-test-data";
private static final String DATA_TYPE = "doc";

@Before
public void setUpData() throws IOException {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(DATA_TYPE, "time", "type=date,format=epoch_millis")
.addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis")
.get();

// We are going to create data for last 2 days
Expand All @@ -68,7 +68,7 @@ public void setUpData() throws IOException {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
int bucketRate = bucket == anomalousBucket ? anomalousRate : normalRate;
for (int point = 0; point < bucketRate; point++) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", timestamp);
bulkRequestBuilder.add(indexRequest);
}
Expand Down Expand Up @@ -97,7 +97,7 @@ public void testDeleteExpiredData() throws Exception {
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias(), "doc", docId);
IndexRequest indexRequest = new IndexRequest(AnomalyDetectorsIndex.jobStateIndexWriteAlias()).id(docId);
indexRequest.source(Collections.emptyMap());
bulkRequestBuilder.add(indexRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
Expand All @@ -36,12 +37,11 @@
public class ModelPlotsIT extends MlNativeAutodetectIntegTestCase {

private static final String DATA_INDEX = "model-plots-test-data";
private static final String DATA_TYPE = "doc";

@Before
public void setUpData() {
client().admin().indices().prepareCreate(DATA_INDEX)
.addMapping(DATA_TYPE, "time", "type=date,format=epoch_millis", "user", "type=keyword")
.addMapping(SINGLE_MAPPING_NAME, "time", "type=date,format=epoch_millis", "user", "type=keyword")
.get();

List<String> users = Arrays.asList("user_1", "user_2", "user_3");
Expand All @@ -53,7 +53,7 @@ public void setUpData() {
for (int bucket = 0; bucket < totalBuckets; bucket++) {
long timestamp = nowMillis - TimeValue.timeValueHours(totalBuckets - bucket).getMillis();
for (String user : users) {
IndexRequest indexRequest = new IndexRequest(DATA_INDEX, DATA_TYPE);
IndexRequest indexRequest = new IndexRequest(DATA_INDEX);
indexRequest.source("time", timestamp, "user", user);
bulkRequestBuilder.add(indexRequest);
}
Expand Down