Skip to content

Remove the ability to update datafeed's job_id. #44752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.elasticsearch.client.ml.datafeed;

import org.elasticsearch.client.ml.job.config.Job;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class DatafeedUpdate implements ToXContentObject {
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), DatafeedConfig.ID);

PARSER.declareString(Builder::setJobId, Job.ID);
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDEXES);
PARSER.declareStringArray(Builder::setIndices, DatafeedConfig.INDICES);
PARSER.declareString((builder, val) -> builder.setQueryDelay(
Expand Down Expand Up @@ -88,7 +86,6 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
}

private final String id;
private final String jobId;
private final TimeValue queryDelay;
private final TimeValue frequency;
private final List<String> indices;
Expand All @@ -99,11 +96,10 @@ private static BytesReference parseBytes(XContentParser parser) throws IOExcepti
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;

private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
private DatafeedUpdate(String id, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
this.frequency = frequency;
this.indices = indices;
Expand All @@ -126,7 +122,6 @@ public String getId() {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(DatafeedConfig.ID.getPreferredName(), id);
addOptionalField(builder, Job.ID, jobId);
if (queryDelay != null) {
builder.field(DatafeedConfig.QUERY_DELAY.getPreferredName(), queryDelay.getStringRep());
}
Expand Down Expand Up @@ -162,10 +157,6 @@ private void addOptionalField(XContentBuilder builder, ParseField field, Object
}
}

public String getJobId() {
return jobId;
}

public TimeValue getQueryDelay() {
return queryDelay;
}
Expand Down Expand Up @@ -228,7 +219,6 @@ public boolean equals(Object other) {
DatafeedUpdate that = (DatafeedUpdate) other;

return Objects.equals(this.id, that.id)
&& Objects.equals(this.jobId, that.jobId)
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.queryDelay, that.queryDelay)
&& Objects.equals(this.indices, that.indices)
Expand All @@ -247,7 +237,7 @@ public boolean equals(Object other) {
*/
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
return Objects.hash(id, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig);
}

Expand All @@ -258,7 +248,6 @@ public static Builder builder(String id) {
public static class Builder {

private String id;
private String jobId;
private TimeValue queryDelay;
private TimeValue frequency;
private List<String> indices;
Expand All @@ -275,7 +264,6 @@ public Builder(String id) {

public Builder(DatafeedUpdate config) {
this.id = config.id;
this.jobId = config.jobId;
this.queryDelay = config.queryDelay;
this.frequency = config.frequency;
this.indices = config.indices;
Expand All @@ -287,11 +275,6 @@ public Builder(DatafeedUpdate config) {
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
}

public Builder setJobId(String jobId) {
this.jobId = jobId;
return this;
}

public Builder setIndices(List<String> indices) {
this.indices = indices;
return this;
Expand Down Expand Up @@ -364,7 +347,7 @@ public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheck
}

public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
return new DatafeedUpdate(id, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,31 +463,6 @@ public void testUpdateDatafeed() throws Exception {
assertThat(datafeedUpdate.getScrollSize(), equalTo(updatedDatafeed.getScrollSize()));
}

public void testUpdateDatafeed_UpdatingJobIdIsDeprecated() throws Exception {
MachineLearningClient machineLearningClient = highLevelClient().machineLearning();

String jobId = randomValidJobId();
Job job = buildJob(jobId);
execute(new PutJobRequest(job), machineLearningClient::putJob, machineLearningClient::putJobAsync);

String anotherJobId = randomValidJobId();
Job anotherJob = buildJob(anotherJobId);
execute(new PutJobRequest(anotherJob), machineLearningClient::putJob, machineLearningClient::putJobAsync);

String datafeedId = "datafeed-" + jobId;
DatafeedConfig datafeedConfig = DatafeedConfig.builder(datafeedId, jobId).setIndices("some_data_index").build();
execute(new PutDatafeedRequest(datafeedConfig), machineLearningClient::putDatafeed, machineLearningClient::putDatafeedAsync);

DatafeedUpdate datafeedUpdateWithChangedJobId = DatafeedUpdate.builder(datafeedId).setJobId(anotherJobId).build();
WarningFailureException exception = expectThrows(
WarningFailureException.class,
() -> execute(
new UpdateDatafeedRequest(datafeedUpdateWithChangedJobId),
machineLearningClient::updateDatafeed,
machineLearningClient::updateDatafeedAsync));
assertThat(exception.getResponse().getWarnings(), contains("The ability to update a datafeed's job_id is deprecated."));
}

public void testGetDatafeed() throws Exception {
String jobId1 = "test-get-datafeed-job-1";
String jobId2 = "test-get-datafeed-job-2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate

public static DatafeedUpdate createRandom() {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(DatafeedConfigTests.randomValidDatafeedId());
if (randomBoolean()) {
builder.setJobId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
*/
package org.elasticsearch.xpack.core.ml.datafeed;

import org.apache.logging.log4j.LogManager;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser;
Expand Down Expand Up @@ -46,8 +44,7 @@
*/
public class DatafeedUpdate implements Writeable, ToXContentObject {

private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger(DatafeedUpdate.class));
private static final String DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE = "The ability to update a datafeed's job_id is deprecated.";
static final String ERROR_MESSAGE_ON_JOB_ID_UPDATE = "Datafeed's job_id cannot be changed.";

public static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("datafeed_update", Builder::new);

Expand Down Expand Up @@ -110,9 +107,6 @@ private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
if (jobId != null) {
deprecationLogger.deprecated(DEPRECATION_MESSAGE_ON_JOB_ID_UPDATE);
}
}

public DatafeedUpdate(StreamInput in) throws IOException {
Expand Down Expand Up @@ -298,6 +292,9 @@ public DatafeedConfig apply(DatafeedConfig datafeedConfig, Map<String, String> h

DatafeedConfig.Builder builder = new DatafeedConfig.Builder(datafeedConfig);
if (jobId != null) {
if (datafeedConfig.getJobId() != null && datafeedConfig.getJobId().equals(jobId) == false) {
throw ExceptionsHelper.badRequestException(ERROR_MESSAGE_ON_JOB_ID_UPDATE);
}
builder.setJobId(jobId);
}
if (queryDelay != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void setUpDatafeedId() {

@Override
protected Request createTestInstance() {
return new Request(DatafeedUpdateTests.createRandomized(datafeedId, null, false));
return new Request(DatafeedUpdateTests.createRandomized(datafeedId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.core.ml.datafeed;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand All @@ -23,6 +24,7 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationBuilders;
Expand Down Expand Up @@ -77,14 +79,11 @@ protected DatafeedUpdate createTestInstance() {
}

public static DatafeedUpdate createRandomized(String datafeedId) {
return createRandomized(datafeedId, null, true);
return createRandomized(datafeedId, null);
}

public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed, boolean canSetJobId) {
public static DatafeedUpdate createRandomized(String datafeedId, @Nullable DatafeedConfig datafeed) {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(datafeedId);
if (randomBoolean() && datafeed == null && canSetJobId) {
builder.setJobId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
builder.setQueryDelay(TimeValue.timeValueMillis(randomIntBetween(1, Integer.MAX_VALUE)));
}
Expand Down Expand Up @@ -197,6 +196,24 @@ public void testApply_failBecauseTargetDatafeedHasDifferentId() {
expectThrows(IllegalArgumentException.class, () -> createRandomized(datafeed.getId() + "_2").apply(datafeed, null));
}

public void testApply_failBecauseJobIdChanged() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");

DatafeedUpdate datafeedUpdateWithUnchangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
.setJobId("foo")
.build();
DatafeedConfig updatedDatafeed = datafeedUpdateWithUnchangedJobId.apply(datafeed, Collections.emptyMap());
assertThat(updatedDatafeed, equalTo(datafeed));

DatafeedUpdate datafeedUpdateWithChangedJobId = new DatafeedUpdate.Builder(datafeed.getId())
.setJobId("bar")
.build();
ElasticsearchStatusException ex = expectThrows(
ElasticsearchStatusException.class, () -> datafeedUpdateWithChangedJobId.apply(datafeed, Collections.emptyMap()));
assertThat(ex.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(ex.getMessage(), equalTo(DatafeedUpdate.ERROR_MESSAGE_ON_JOB_ID_UPDATE));
}

public void testApply_givenEmptyUpdate() {
DatafeedConfig datafeed = DatafeedConfigTests.createRandomizedDatafeedConfig("foo");
DatafeedConfig updatedDatafeed = new DatafeedUpdate.Builder(datafeed.getId()).build().apply(datafeed, Collections.emptyMap());
Expand All @@ -223,7 +240,6 @@ public void testApply_givenFullUpdateNoAggregations() {
DatafeedConfig datafeed = datafeedBuilder.build();
QueryProvider queryProvider = createRandomValidQueryProvider("a", "b");
DatafeedUpdate.Builder update = new DatafeedUpdate.Builder(datafeed.getId());
update.setJobId("bar");
update.setIndices(Collections.singletonList("i_2"));
update.setQueryDelay(TimeValue.timeValueSeconds(42));
update.setFrequency(TimeValue.timeValueSeconds(142));
Expand All @@ -235,7 +251,7 @@ public void testApply_givenFullUpdateNoAggregations() {

DatafeedConfig updatedDatafeed = update.build().apply(datafeed, Collections.emptyMap());

assertThat(updatedDatafeed.getJobId(), equalTo("bar"));
assertThat(updatedDatafeed.getJobId(), equalTo("foo-feed"));
assertThat(updatedDatafeed.getIndices(), equalTo(Collections.singletonList("i_2")));
assertThat(updatedDatafeed.getQueryDelay(), equalTo(TimeValue.timeValueSeconds(42)));
assertThat(updatedDatafeed.getFrequency(), equalTo(TimeValue.timeValueSeconds(142)));
Expand Down Expand Up @@ -276,9 +292,9 @@ public void testApply_GivenRandomUpdates_AssertImmutability() {
withoutAggs.setAggProvider(null);
datafeed = withoutAggs.build();
}
DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed, true);
DatafeedUpdate update = createRandomized(datafeed.getId(), datafeed);
while (update.isNoop(datafeed)) {
update = createRandomized(datafeed.getId(), datafeed, true);
update = createRandomized(datafeed.getId(), datafeed);
}

DatafeedConfig updatedDatafeed = update.apply(datafeed, Collections.emptyMap());
Expand Down Expand Up @@ -339,12 +355,9 @@ public void testSerializationOfComplexAggsBetweenVersions() throws IOException {
@Override
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
switch (between(0, 9)) {
case 0:
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
break;
switch (between(1, 9)) {
case 1:
builder.setJobId(instance.getJobId() + randomAlphaOfLength(5));
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
break;
case 2:
if (instance.getQueryDelay() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodeHotThreads;
import org.elasticsearch.action.admin.cluster.node.hotthreads.NodesHotThreadsResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -137,48 +136,6 @@ public void testDatafeedTimingStats_DatafeedRecreated() throws Exception {
openAndRunJob.run();
}

public void testDatafeedTimingStats_DatafeedJobIdUpdated() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs = randomIntBetween(32, 2048);
Instant now = Instant.now();
indexDocs(logger, "data", numDocs, now.minus(Duration.ofDays(14)).toEpochMilli(), now.toEpochMilli());

Job.Builder jobA = createScheduledJob("lookback-job-jobid-updated");
Job.Builder jobB = createScheduledJob("other-lookback-job-jobid-updated");
for (Job.Builder job : Arrays.asList(jobA, jobB)) {
registerJob(job);
putJob(job);
}

String datafeedId = "lookback-datafeed";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, jobA.getId(), Arrays.asList("data"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);

CheckedConsumer<Job.Builder, Exception> openAndRunJob = job -> {
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
// Bind datafeedId to the current job on the list, timing stats are wiped out.
// Datafeed did not do anything yet, hence search_count is equal to 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), equalTo(0L));
startDatafeed(datafeedId, 0L, now.toEpochMilli());
assertBusy(() -> {
assertThat(getDataCounts(job.getId()).getProcessedRecordCount(), equalTo(numDocs));
// Datafeed processed numDocs documents so search_count must be greater than 0.
assertDatafeedStats(datafeedId, DatafeedState.STOPPED, job.getId(), greaterThan(0L));
}, 60, TimeUnit.SECONDS);
waitUntilJobIsClosed(job.getId());
};

openAndRunJob.accept(jobA);
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobB.getId()).build()); // wipes out timing stats
openAndRunJob.accept(jobB);
updateDatafeed(new DatafeedUpdate.Builder(datafeedId).setJobId(jobA.getId()).build()); // wipes out timing stats
openAndRunJob.accept(jobA);
}

public void testDatafeedTimingStats_QueryDelayUpdated_TimingStatsNotReset() throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
Expand Down
Loading