Skip to content

[ML] Audit job open failures and stop any corresponding datafeed #80665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.junit.After;
Expand Down Expand Up @@ -542,7 +543,108 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
}

public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
internalCluster().ensureAtMostNumDataNodes(0);
logger.info("Starting dedicated master node...");
internalCluster().startMasterOnlyNode();
logger.info("Starting ml and data node...");
internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
logger.info("Starting another ml and data node...");
internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
ensureStableCluster();

// index some datafeed data
client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();
long numDocs = 80000;
long now = System.currentTimeMillis();
long weekAgo = now - 604800000;
long twoWeeksAgo = weekAgo - 604800000;
indexDocs(logger, "data", numDocs, twoWeeksAgo, weekAgo);

String jobId = "test-node-goes-down-while-running-job";
String datafeedId = jobId + "-datafeed";

Job.Builder job = createScheduledJob(jobId);
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();

DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), TimeValue.timeValueHours(1));
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();

client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));

assertBusy(() -> {
GetJobsStatsAction.Response statsResponse = client().execute(
GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request(job.getId())
).actionGet();
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
}, 30, TimeUnit.SECONDS);

DiscoveryNode nodeRunningJob = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId()))
.actionGet()
.getResponse()
.results()
.get(0)
.getNode();

setMlIndicesDelayedNodeLeftTimeoutToZero();

StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();

waitForJobToHaveProcessedAtLeast(jobId, 1000);

// The datafeed should be started
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse = client().execute(
GetDatafeedsStatsAction.INSTANCE,
new GetDatafeedsStatsAction.Request(config.getId())
).actionGet();
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
}, 30, TimeUnit.SECONDS);

// Create a problem that will make the job fail when it restarts on a different node
String snapshotId = "123";
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
JobResultsPersister jobResultsPersister = internalCluster().getInstance(
JobResultsPersister.class,
internalCluster().getMasterName()
);
jobResultsPersister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE, () -> true);
UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(
jobId,
new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build()
);
client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet();
refresh(AnomalyDetectorsIndex.resultsWriteAlias(jobId));

// Make the job move to a different node
internalCluster().stopNode(nodeRunningJob.getName());

// Wait for the job to fail during reassignment
assertBusy(() -> {
GetJobsStatsAction.Response statsResponse = client().execute(
GetJobsStatsAction.INSTANCE,
new GetJobsStatsAction.Request(job.getId())
).actionGet();
assertEquals(JobState.FAILED, statsResponse.getResponse().results().get(0).getState());
}, 30, TimeUnit.SECONDS);

// The datafeed should then be stopped
assertBusy(() -> {
GetDatafeedsStatsAction.Response statsResponse = client().execute(
GetDatafeedsStatsAction.INSTANCE,
new GetDatafeedsStatsAction.Request(config.getId())
).actionGet();
assertEquals(DatafeedState.STOPPED, statsResponse.getResponse().results().get(0).getDatafeedState());
}, 30, TimeUnit.SECONDS);

// Force close the failed job to clean up
client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId).setForce(true)).actionGet();
}

private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private AssignmentFailure checkAssignment() {
}

if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
// lets try again later when the job has been opened:
// let's try again later when the job has been opened:
String reason = "cannot start datafeed ["
+ datafeedId
+ "], because the job's ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
// Step 3. Set scheduled events on message and write update process message
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(events -> {
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lulz, accept null if null

}, handler);

// Step 2. Set the filters on the message and get scheduled events
Expand Down Expand Up @@ -545,20 +539,18 @@ public void openJob(

// Start the process
ActionListener<Boolean> stateAliasHandler = ActionListener.wrap(
r -> {
jobManager.getJob(
jobId,
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
);
},
r -> jobManager.getJob(
jobId,
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
),
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) {
String msg = "Detected a problem with your setup of machine learning, the state index alias ["
+ AnomalyDetectorsIndex.jobStateIndexWriteAlias()
+ "] exists as index but must be an alias.";
logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e);
auditor.error(jobId, msg);
setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true));
// The close handler is responsible for auditing this and setting the job state to failed
closeHandler.accept(new IllegalStateException(msg, e), true);
} else {
closeHandler.accept(e, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.core.ml.job.config.Blocked;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
Expand Down Expand Up @@ -298,8 +299,8 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
return;
}

ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> {
if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {

// This job has a running datafeed attached to it.
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
Expand All @@ -319,45 +320,84 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
}
});

hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener);
}

private void failTask(JobTask jobTask, String reason) {
String jobId = jobTask.getJobId();
auditor.error(jobId, reason);
JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason);
jobTask.updatePersistentTaskState(
failedState,
ActionListener.wrap(
r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())),
e -> {
logger.error(
new ParameterizedMessage(
"[{}] error while setting task state to failed; marking task as failed",
jobTask.getJobId()
),
e
);
jobTask.markAsFailed(e);
}
)
);
jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> {
logger.debug("[{}] updated task state to failed", jobId);
stopAssociatedDatafeedForFailedJob(jobId);
}, e -> {
logger.error(new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobId), e);
jobTask.markAsFailed(e);
stopAssociatedDatafeedForFailedJob(jobId);
}));
}

private void stopAssociatedDatafeedForFailedJob(String jobId) {

if (autodetectProcessManager.isNodeDying()) {
// The node shutdown caught us at a bad time, and we cannot stop the datafeed
return;
}

ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
if (runningDatafeedId == null) {
return;
}
StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId);
request.setForce(true);
executeAsyncWithOrigin(
client,
ML_ORIGIN,
StopDatafeedAction.INSTANCE,
request,
ActionListener.wrap(
// StopDatafeedAction will audit the stopping of the datafeed if it succeeds so we don't need to do that here
r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId),
e -> {
if (autodetectProcessManager.isNodeDying() == false) {
logger.error(
new ParameterizedMessage(
"[{}] failed to stop associated datafeed [{}] after job failure",
jobId,
runningDatafeedId
),
e
);
auditor.error(jobId, "failed to stop associated datafeed after job failure");
}
}
)
);
}, e -> {
if (autodetectProcessManager.isNodeDying() == false) {
logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e);
}
});

getRunningDatafeed(jobId, getRunningDatafeedListener);
}

private boolean isMasterNodeVersionOnOrAfter(Version version) {
return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
}

private void hasRunningDatafeedTask(String jobId, ActionListener<Boolean> listener) {
private void getRunningDatafeed(String jobId, ActionListener<String> listener) {
ActionListener<Set<String>> datafeedListener = ActionListener.wrap(datafeeds -> {
assert datafeeds.size() <= 1;
if (datafeeds.isEmpty()) {
listener.onResponse(false);
listener.onResponse(null);
return;
}

String datafeedId = datafeeds.iterator().next();
PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
listener.onResponse(datafeedTask != null);
listener.onResponse(datafeedTask != null ? datafeedId : null);
}, listener::onFailure);

datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener);
Expand Down Expand Up @@ -504,7 +544,7 @@ private void openJob(JobTask jobTask) {
}
} else if (autodetectProcessManager.isNodeDying() == false) {
logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2);
failTask(jobTask, "failed to open job");
failTask(jobTask, "failed to open job: " + e2.getMessage());
}
});
}
Expand Down