Skip to content

Commit 847db07

Browse files
committed
[ML] Audit job open failures and stop any corresponding datafeed (elastic#80665)
The anomaly detection code contained an assumption dating back to 2016 that if a job failed then its datafeed would notice and stop itself. That works if the job fails on a node after it has successfully started up. But it doesn't work if the job fails during the startup sequence. If the job is being started for the first time then the datafeed won't be running, so there's no problem, but if the job fails when it's being reassigned to a new node then it breaks down, because the datafeed is started by not assigned to any node at that instant. This PR addresses this by making the job force-stop its own datafeed if it fails during its startup sequence and the datafeed is started. Fixes elastic#48934 Additionally, auditing of job failures during the startup sequence is moved so that it happens for all failure scenarios instead of just one. Fixes elastic#80621
1 parent 3f925ae commit 847db07

File tree

4 files changed

+173
-39
lines changed

4 files changed

+173
-39
lines changed

x-pack/plugin/ml/src/internalClusterTest/java/org/elasticsearch/xpack/ml/integration/MlDistributedFailureIT.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
6464
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
6565
import org.elasticsearch.xpack.ml.MachineLearning;
66+
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
6667
import org.elasticsearch.xpack.ml.job.process.autodetect.BlackHoleAutodetectProcess;
6768
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
6869
import org.junit.After;
@@ -559,7 +560,108 @@ public void testClusterWithTwoMlNodes_RunsDatafeed_GivenOriginalNodeGoesDown() t
559560
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs));
560561
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
561562
});
563+
}
564+
565+
public void testClusterWithTwoMlNodes_StopsDatafeed_GivenJobFailsOnReassign() throws Exception {
566+
internalCluster().ensureAtMostNumDataNodes(0);
567+
logger.info("Starting dedicated master node...");
568+
internalCluster().startMasterOnlyNode();
569+
logger.info("Starting ml and data node...");
570+
internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
571+
logger.info("Starting another ml and data node...");
572+
internalCluster().startNode(onlyRoles(Set.of(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.ML_ROLE)));
573+
ensureStableCluster();
574+
575+
// index some datafeed data
576+
client().admin().indices().prepareCreate("data").setMapping("time", "type=date").get();
577+
long numDocs = 80000;
578+
long now = System.currentTimeMillis();
579+
long weekAgo = now - 604800000;
580+
long twoWeeksAgo = weekAgo - 604800000;
581+
indexDocs(logger, "data", numDocs, twoWeeksAgo, weekAgo);
582+
583+
String jobId = "test-node-goes-down-while-running-job";
584+
String datafeedId = jobId + "-datafeed";
585+
586+
Job.Builder job = createScheduledJob(jobId);
587+
PutJobAction.Request putJobRequest = new PutJobAction.Request(job);
588+
client().execute(PutJobAction.INSTANCE, putJobRequest).actionGet();
589+
590+
DatafeedConfig config = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"), TimeValue.timeValueHours(1));
591+
PutDatafeedAction.Request putDatafeedRequest = new PutDatafeedAction.Request(config);
592+
client().execute(PutDatafeedAction.INSTANCE, putDatafeedRequest).actionGet();
593+
594+
client().execute(OpenJobAction.INSTANCE, new OpenJobAction.Request(job.getId()));
595+
596+
assertBusy(() -> {
597+
GetJobsStatsAction.Response statsResponse = client().execute(
598+
GetJobsStatsAction.INSTANCE,
599+
new GetJobsStatsAction.Request(job.getId())
600+
).actionGet();
601+
assertEquals(JobState.OPENED, statsResponse.getResponse().results().get(0).getState());
602+
}, 30, TimeUnit.SECONDS);
603+
604+
DiscoveryNode nodeRunningJob = client().execute(GetJobsStatsAction.INSTANCE, new GetJobsStatsAction.Request(job.getId()))
605+
.actionGet()
606+
.getResponse()
607+
.results()
608+
.get(0)
609+
.getNode();
610+
611+
setMlIndicesDelayedNodeLeftTimeoutToZero();
612+
613+
StartDatafeedAction.Request startDatafeedRequest = new StartDatafeedAction.Request(config.getId(), 0L);
614+
client().execute(StartDatafeedAction.INSTANCE, startDatafeedRequest).get();
615+
616+
waitForJobToHaveProcessedAtLeast(jobId, 1000);
617+
618+
// The datafeed should be started
619+
assertBusy(() -> {
620+
GetDatafeedsStatsAction.Response statsResponse = client().execute(
621+
GetDatafeedsStatsAction.INSTANCE,
622+
new GetDatafeedsStatsAction.Request(config.getId())
623+
).actionGet();
624+
assertEquals(DatafeedState.STARTED, statsResponse.getResponse().results().get(0).getDatafeedState());
625+
}, 30, TimeUnit.SECONDS);
626+
627+
// Create a problem that will make the job fail when it restarts on a different node
628+
String snapshotId = "123";
629+
ModelSnapshot modelSnapshot = new ModelSnapshot.Builder(jobId).setSnapshotId(snapshotId).setTimestamp(new Date()).build();
630+
JobResultsPersister jobResultsPersister = internalCluster().getInstance(
631+
JobResultsPersister.class,
632+
internalCluster().getMasterName()
633+
);
634+
jobResultsPersister.persistModelSnapshot(modelSnapshot, WriteRequest.RefreshPolicy.IMMEDIATE, () -> true);
635+
UpdateJobAction.Request updateJobRequest = UpdateJobAction.Request.internal(
636+
jobId,
637+
new JobUpdate.Builder(jobId).setModelSnapshotId(snapshotId).build()
638+
);
639+
client().execute(UpdateJobAction.INSTANCE, updateJobRequest).actionGet();
640+
refresh(AnomalyDetectorsIndex.resultsWriteAlias(jobId));
641+
642+
// Make the job move to a different node
643+
internalCluster().stopNode(nodeRunningJob.getName());
644+
645+
// Wait for the job to fail during reassignment
646+
assertBusy(() -> {
647+
GetJobsStatsAction.Response statsResponse = client().execute(
648+
GetJobsStatsAction.INSTANCE,
649+
new GetJobsStatsAction.Request(job.getId())
650+
).actionGet();
651+
assertEquals(JobState.FAILED, statsResponse.getResponse().results().get(0).getState());
652+
}, 30, TimeUnit.SECONDS);
653+
654+
// The datafeed should then be stopped
655+
assertBusy(() -> {
656+
GetDatafeedsStatsAction.Response statsResponse = client().execute(
657+
GetDatafeedsStatsAction.INSTANCE,
658+
new GetDatafeedsStatsAction.Request(config.getId())
659+
).actionGet();
660+
assertEquals(DatafeedState.STOPPED, statsResponse.getResponse().results().get(0).getDatafeedState());
661+
}, 30, TimeUnit.SECONDS);
562662

663+
// Force close the failed job to clean up
664+
client().execute(CloseJobAction.INSTANCE, new CloseJobAction.Request(jobId).setForce(true)).actionGet();
563665
}
564666

565667
private void setupJobWithoutDatafeed(String jobId, ByteSizeValue modelMemoryLimit) throws Exception {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private AssignmentFailure checkAssignment() {
145145
}
146146

147147
if (jobState.isAnyOf(JobState.OPENING, JobState.OPENED) == false) {
148-
// lets try again later when the job has been opened:
148+
// let's try again later when the job has been opened:
149149
String reason = "cannot start datafeed ["
150150
+ datafeedId
151151
+ "], because the job's ["

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -403,13 +403,7 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
403403
// Step 3. Set scheduled events on message and write update process message
404404
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(events -> {
405405
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
406-
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
407-
if (e == null) {
408-
handler.accept(null);
409-
} else {
410-
handler.accept(e);
411-
}
412-
});
406+
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> handler.accept(e));
413407
}, handler);
414408

415409
// Step 2. Set the filters on the message and get scheduled events
@@ -544,20 +538,18 @@ public void openJob(
544538

545539
// Start the process
546540
ActionListener<Boolean> stateAliasHandler = ActionListener.wrap(
547-
r -> {
548-
jobManager.getJob(
549-
jobId,
550-
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
551-
);
552-
},
541+
r -> jobManager.getJob(
542+
jobId,
543+
ActionListener.wrap(job -> startProcess(jobTask, job, closeHandler), e -> closeHandler.accept(e, true))
544+
),
553545
e -> {
554546
if (ExceptionsHelper.unwrapCause(e) instanceof InvalidAliasNameException) {
555547
String msg = "Detected a problem with your setup of machine learning, the state index alias ["
556548
+ AnomalyDetectorsIndex.jobStateIndexWriteAlias()
557549
+ "] exists as index but must be an alias.";
558550
logger.error(new ParameterizedMessage("[{}] {}", jobId, msg), e);
559-
auditor.error(jobId, msg);
560-
setJobState(jobTask, JobState.FAILED, msg, e2 -> closeHandler.accept(e, true));
551+
// The close handler is responsible for auditing this and setting the job state to failed
552+
closeHandler.accept(new IllegalStateException(msg, e), true);
561553
} else {
562554
closeHandler.accept(e, true);
563555
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/task/OpenJobPersistentTasksExecutor.java

Lines changed: 63 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
3939
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
4040
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
41+
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
4142
import org.elasticsearch.xpack.core.ml.job.config.Blocked;
4243
import org.elasticsearch.xpack.core.ml.job.config.Job;
4344
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@@ -286,8 +287,8 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
286287
return;
287288
}
288289

289-
ActionListener<Boolean> hasRunningDatafeedTaskListener = ActionListener.wrap(hasRunningDatafeed -> {
290-
if (hasRunningDatafeed && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
290+
ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
291+
if (runningDatafeedId != null && isMasterNodeVersionOnOrAfter(MIN_MASTER_NODE_VERSION_FOR_REVERTING_TO_CURRENT_SNAPSHOT)) {
291292

292293
// This job has a running datafeed attached to it.
293294
// In order to prevent gaps in the model we revert to the current snapshot deleting intervening results.
@@ -307,45 +308,84 @@ private void runJob(JobTask jobTask, JobState jobState, OpenJobAction.JobParams
307308
}
308309
});
309310

310-
hasRunningDatafeedTask(jobTask.getJobId(), hasRunningDatafeedTaskListener);
311+
getRunningDatafeed(jobTask.getJobId(), getRunningDatafeedListener);
311312
}
312313

313314
private void failTask(JobTask jobTask, String reason) {
315+
String jobId = jobTask.getJobId();
316+
auditor.error(jobId, reason);
314317
JobTaskState failedState = new JobTaskState(JobState.FAILED, jobTask.getAllocationId(), reason);
315-
jobTask.updatePersistentTaskState(
316-
failedState,
317-
ActionListener.wrap(
318-
r -> logger.debug(() -> new ParameterizedMessage("[{}] updated task state to failed", jobTask.getJobId())),
319-
e -> {
320-
logger.error(
321-
new ParameterizedMessage(
322-
"[{}] error while setting task state to failed; marking task as failed",
323-
jobTask.getJobId()
324-
),
325-
e
326-
);
327-
jobTask.markAsFailed(e);
328-
}
329-
)
330-
);
318+
jobTask.updatePersistentTaskState(failedState, ActionListener.wrap(r -> {
319+
logger.debug("[{}] updated task state to failed", jobId);
320+
stopAssociatedDatafeedForFailedJob(jobId);
321+
}, e -> {
322+
logger.error(new ParameterizedMessage("[{}] error while setting task state to failed; marking task as failed", jobId), e);
323+
jobTask.markAsFailed(e);
324+
stopAssociatedDatafeedForFailedJob(jobId);
325+
}));
326+
}
327+
328+
private void stopAssociatedDatafeedForFailedJob(String jobId) {
329+
330+
if (autodetectProcessManager.isNodeDying()) {
331+
// The node shutdown caught us at a bad time, and we cannot stop the datafeed
332+
return;
333+
}
334+
335+
ActionListener<String> getRunningDatafeedListener = ActionListener.wrap(runningDatafeedId -> {
336+
if (runningDatafeedId == null) {
337+
return;
338+
}
339+
StopDatafeedAction.Request request = new StopDatafeedAction.Request(runningDatafeedId);
340+
request.setForce(true);
341+
executeAsyncWithOrigin(
342+
client,
343+
ML_ORIGIN,
344+
StopDatafeedAction.INSTANCE,
345+
request,
346+
ActionListener.wrap(
347+
// StopDatafeedAction will audit the stopping of the datafeed if it succeeds so we don't need to do that here
348+
r -> logger.info("[{}] stopped associated datafeed [{}] after job failure", jobId, runningDatafeedId),
349+
e -> {
350+
if (autodetectProcessManager.isNodeDying() == false) {
351+
logger.error(
352+
new ParameterizedMessage(
353+
"[{}] failed to stop associated datafeed [{}] after job failure",
354+
jobId,
355+
runningDatafeedId
356+
),
357+
e
358+
);
359+
auditor.error(jobId, "failed to stop associated datafeed after job failure");
360+
}
361+
}
362+
)
363+
);
364+
}, e -> {
365+
if (autodetectProcessManager.isNodeDying() == false) {
366+
logger.error(new ParameterizedMessage("[{}] failed to search for associated datafeed", jobId), e);
367+
}
368+
});
369+
370+
getRunningDatafeed(jobId, getRunningDatafeedListener);
331371
}
332372

333373
private boolean isMasterNodeVersionOnOrAfter(Version version) {
334374
return clusterState.nodes().getMasterNode().getVersion().onOrAfter(version);
335375
}
336376

337-
private void hasRunningDatafeedTask(String jobId, ActionListener<Boolean> listener) {
377+
private void getRunningDatafeed(String jobId, ActionListener<String> listener) {
338378
ActionListener<Set<String>> datafeedListener = ActionListener.wrap(datafeeds -> {
339379
assert datafeeds.size() <= 1;
340380
if (datafeeds.isEmpty()) {
341-
listener.onResponse(false);
381+
listener.onResponse(null);
342382
return;
343383
}
344384

345385
String datafeedId = datafeeds.iterator().next();
346386
PersistentTasksCustomMetadata tasks = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
347387
PersistentTasksCustomMetadata.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
348-
listener.onResponse(datafeedTask != null);
388+
listener.onResponse(datafeedTask != null ? datafeedId : null);
349389
}, listener::onFailure);
350390

351391
datafeedConfigProvider.findDatafeedIdsForJobIds(Collections.singleton(jobId), datafeedListener);
@@ -443,7 +483,7 @@ private void openJob(JobTask jobTask) {
443483
}
444484
} else if (autodetectProcessManager.isNodeDying() == false) {
445485
logger.error(new ParameterizedMessage("[{}] failed to open job", jobTask.getJobId()), e2);
446-
failTask(jobTask, "failed to open job");
486+
failTask(jobTask, "failed to open job: " + e2.getMessage());
447487
}
448488
});
449489
}

0 commit comments

Comments
 (0)