Skip to content

[ML] Fix calendar and filter updates from non-master nodes #31804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.ml.job;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -31,9 +32,26 @@
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request;
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response;

public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
/**
* This class serves as a queue for updates to the job process.
* Queueing is important for 2 reasons: first, it throttles the updates
* to the process, and second and most important, it preserves the order of the updates
* for actions that run on the master node. For preserving the order of the updates
* to the job config, it's necessary to handle the whole update chain on the master
* node. However, for updates to resources the job uses (e.g. calendars, filters),
* they can be handled on non-master nodes as long as the update process action
* is fetching the latest version of those resources from the index instead of
* using the version that existed while the handling action was at work. This makes
* sure that even if the order of updates gets reversed, the final process update
* will fetch the valid state of those external resources ensuring the process is
* in sync.
*/
public class UpdateJobProcessNotifier extends AbstractComponent {

private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class);

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);

Expand All @@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {

@Override
public void beforeStart() {
start();
}

@Override
public void beforeStop() {
stop();
Expand All @@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener<Boolean> listener) {
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
}

@Override
public void onMaster() {
start();
}

@Override
public void offMaster() {
stop();
}

private void start() {
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
}
Expand All @@ -79,12 +93,6 @@ private void stop() {
}
}

@Override
public String executorName() {
// SAME is ok here, because both start() and stop() are inexpensive:
return ThreadPool.Names.SAME;
}

private void processNextUpdate() {
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
try {
Expand All @@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {
}
UpdateHolder updateHolder = updatesIterator.next();
UpdateParams update = updateHolder.update;

if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;
}

Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,34 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
}
}

public void writeUpdateProcessMessage(UpdateParams updateParams, List<ScheduledEvent> scheduledEvents,
BiConsumer<Void, Exception> handler) {
public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
if (updateParams.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
if (update.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
}

// Filters have to be written before detectors
if (updateParams.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
if (update.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
}

// Add detector rules
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
if (update.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
if (detectorUpdate.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
}
}
}

// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
if (scheduledEvents != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
if (update.getScheduledEvents() != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan());
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -22,35 +20,39 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
Expand Down Expand Up @@ -82,6 +84,8 @@
import java.util.function.Consumer;

import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class AutodetectProcessManager extends AbstractComponent {

Expand Down Expand Up @@ -156,7 +160,7 @@ public void onNodeStartup() {
}
}

public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
public synchronized void closeAllJobsOnThisNode(String reason) {
int numJobs = processByAllocation.size();
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
Expand Down Expand Up @@ -322,8 +326,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
});
}

public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
Consumer<Exception> handler) {
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
Expand All @@ -332,25 +335,59 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
return;
}

UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());

// Step 3. Set scheduled events on message and write update process message
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
events -> {
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
},
handler::accept);

if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
}, handler
);

// Step 2. Set the filter on the message and get scheduled events
ActionListener<MlFilter> filterListener = ActionListener.wrap(
filter -> {
updateProcessMessage.setFilter(filter);

if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
} else {
eventsListener.onResponse(null);
}
}, handler
);

// Step 1. Get the filter
if (updateParams.getFilter() == null) {
filterListener.onResponse(null);
} else {
eventsListener.onResponse(null);
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
getFilterRequest.setFilterId(updateParams.getFilter().getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest,
new ActionListener<GetFiltersAction.Response>() {

@Override
public void onResponse(GetFiltersAction.Response response) {
filterListener.onResponse(response.getFilters().results().get(0));
}

@Override
public void onFailure(Exception e) {
handler.accept(e);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public MlFilter getFilter() {
return filter;
}

/**
* Returns true if the update params include a job update,
* ie an update to the job config directly rather than an
* update to external resources a job uses (e.g. calendars, filters).
*/
public boolean isJobUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
}

public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}
Expand Down
Loading