diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 5ac3a01690456..bcaa4b260d751 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -111,9 +111,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled); - - // listen for state changes (this node starts/stops being the elected master, or new nodes are added) - clusterService.addListener(this); } private void setEnabled(boolean enabled) { diff --git a/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java b/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java index 0b17cfecf6c34..364deb98e7497 100644 --- a/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java +++ b/server/src/main/java/org/elasticsearch/cluster/LocalNodeMasterListener.java @@ -33,20 +33,5 @@ public interface LocalNodeMasterListener { * Called when the local node used to be the master, a new master was elected and it's no longer the local node. */ void offMaster(); - - /** - * The name of the executor that the implementation of the callbacks of this lister should be executed on. The thread - * that is responsible for managing instances of this lister is the same thread handling the cluster state events. If - * the work done is the callbacks above is inexpensive, this value may be - * {@link org.elasticsearch.threadpool.ThreadPool.Names#SAME SAME} (indicating that the callbacks will run on the same thread - * as the cluster state events are fired with). On the other hand, if the logic in the callbacks are heavier and take - * longer to process (or perhaps involve blocking due to IO operations), prefer to execute them on a separate more appropriate - * executor (eg. {@link org.elasticsearch.threadpool.ThreadPool.Names#GENERIC GENERIC} - * or {@link org.elasticsearch.threadpool.ThreadPool.Names#MANAGEMENT MANAGEMENT}). - * - * @return The name of the executor that will run the callbacks of this listener. - */ - String executorName(); - } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index 4d94cc31d3da8..005f2279b264f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -114,7 +114,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings this.clusterSettings = clusterSettings; this.threadPool = threadPool; this.state = new AtomicReference<>(); - this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool); + this.localNodeMasterListeners = new LocalNodeMasterListeners(); this.nodeName = nodeName; this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings); @@ -611,11 +611,9 @@ public void run() { private static class LocalNodeMasterListeners implements ClusterStateListener { private final List listeners = new CopyOnWriteArrayList<>(); - private final ThreadPool threadPool; private volatile boolean master = false; - private LocalNodeMasterListeners(ThreadPool threadPool) { - this.threadPool = threadPool; + private LocalNodeMasterListeners() { } @Override @@ -623,17 +621,20 @@ public void clusterChanged(ClusterChangedEvent event) { if (!master && event.localNodeMaster()) { master = true; for (LocalNodeMasterListener listener : listeners) { - java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); - executor.execute(new OnMasterRunnable(listener)); + try { + listener.onMaster(); + } catch (Exception e) { + logger.warn("failed to notify LocalNodeMasterListener", e); + } } - return; - } - - if (master && !event.localNodeMaster()) { + } else if (master && !event.localNodeMaster()) { master = false; for (LocalNodeMasterListener listener : listeners) { - java.util.concurrent.Executor executor = threadPool.executor(listener.executorName()); - executor.execute(new OffMasterRunnable(listener)); + try { + listener.offMaster(); + } catch (Exception e) { + logger.warn("failed to notify LocalNodeMasterListener", e); + } } } } @@ -644,34 +645,6 @@ private void add(LocalNodeMasterListener listener) { } - private static class OnMasterRunnable implements Runnable { - - private final LocalNodeMasterListener listener; - - private OnMasterRunnable(LocalNodeMasterListener listener) { - this.listener = listener; - } - - @Override - public void run() { - listener.onMaster(); - } - } - - private static class OffMasterRunnable implements Runnable { - - private final LocalNodeMasterListener listener; - - private OffMasterRunnable(LocalNodeMasterListener listener) { - this.listener = listener; - } - - @Override - public void run() { - listener.offMaster(); - } - } - // this one is overridden in tests so we can control time protected long currentTimeInMillis() { return threadPool.relativeTimeInMillis(); diff --git a/server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java b/server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java index d5164939e1f57..3cb20075ac4e1 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ConsistentSettingsService.java @@ -29,8 +29,10 @@ import org.elasticsearch.common.Priority; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.hash.MessageDigests; -import org.elasticsearch.threadpool.ThreadPool; +import javax.crypto.SecretKey; +import javax.crypto.SecretKeyFactory; +import javax.crypto.spec.PBEKeySpec; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.spec.InvalidKeySpecException; @@ -45,10 +47,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; -import javax.crypto.SecretKey; -import javax.crypto.SecretKeyFactory; -import javax.crypto.spec.PBEKeySpec; - /** * Used to publish secure setting hashes in the cluster state and to validate those hashes against the local values of those same settings. * This is colloquially referred to as the secure setting consistency check. It will publish and verify hashes only for the collection @@ -247,11 +245,6 @@ public void onFailure(String source, Exception e) { public void offMaster() { logger.trace("I am no longer master, nothing to do"); } - - @Override - public String executorName() { - return ThreadPool.Names.SAME; - } } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index c55831ffa7f5a..523da94c0ac62 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -1137,7 +1137,10 @@ private List getCustomNameResolvers(List {}); clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService()); diff --git a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java index 5cb74615a6fe2..fc1d829749944 100644 --- a/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/service/ClusterApplierServiceTests.java @@ -293,11 +293,6 @@ public void onMaster() { public void offMaster() { isMaster.set(false); } - - @Override - public String executorName() { - return ThreadPool.Names.SAME; - } }); ClusterState state = timedClusterApplierService.state(); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 316f36af1e362..010923ec4fed8 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -174,7 +174,9 @@ protected ClusterInfoService newClusterInfoService(Settings settings, ClusterSer if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) { return super.newClusterInfoService(settings, clusterService, threadPool, client); } else { - return new MockInternalClusterInfoService(settings, clusterService, threadPool, client); + final MockInternalClusterInfoService service = new MockInternalClusterInfoService(settings, clusterService, threadPool, client); + clusterService.addListener(service); + return service; } } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java index b4ea7b59fe284..028b69637bdf8 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyMaintenanceService.java @@ -94,11 +94,6 @@ public void offMaster() { } } - @Override - public String executorName() { - return ThreadPool.Names.GENERIC; - } - private void scheduleNext() { if (isMaster) { try { diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java index 9ffa840d1a2d4..d1e4d4063c5b2 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -51,7 +50,7 @@ * A service which runs the {@link LifecyclePolicy}s associated with indexes. */ public class IndexLifecycleService - implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, LocalNodeMasterListener, IndexEventListener { + implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener { private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class); private static final Set IGNORE_STEPS_MAINTENANCE_REQUESTED = Collections.singleton(ShrinkStep.NAME); private volatile boolean isMaster = false; @@ -82,7 +81,6 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); clusterService.addStateApplier(this); clusterService.addListener(this); - clusterService.addLocalNodeMasterListener(this); clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING, this::updatePollInterval); } @@ -121,13 +119,11 @@ public ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentS return newState; } - @Override - public void onMaster() { - this.isMaster = true; + // package private for testing + void onMaster(ClusterState clusterState) { maybeScheduleJob(); - ClusterState clusterState = clusterService.state(); - IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE); + final IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE); if (currentMetadata != null) { OperationMode currentMode = currentMetadata.getOperationMode(); if (OperationMode.STOPPED.equals(currentMode)) { @@ -184,17 +180,6 @@ public void onMaster() { } } - @Override - public void offMaster() { - this.isMaster = false; - cancelJob(); - } - - @Override - public String executorName() { - return ThreadPool.Names.MANAGEMENT; - } - @Override public void beforeIndexAddedToCluster(Index index, Settings indexSettings) { if (shouldParseIndexName(indexSettings)) { @@ -237,7 +222,20 @@ private synchronized void maybeScheduleJob() { @Override public void clusterChanged(ClusterChangedEvent event) { - IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE); + // Instead of using a LocalNodeMasterListener to track master changes, this service will + // track them here to avoid conditions where master listener events run after other + // listeners that depend on what happened in the master listener + final boolean prevIsMaster = this.isMaster; + if (prevIsMaster != event.localNodeMaster()) { + this.isMaster = event.localNodeMaster(); + if (this.isMaster) { + onMaster(event.state()); + } else { + cancelJob(); + } + } + + final IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE); if (this.isMaster && lifecycleMetadata != null) { triggerPolicies(event.state(), true); } diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java new file mode 100644 index 0000000000000..e19a3066fec21 --- /dev/null +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/package-info.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +/** + * This is the Index Lifecycle Management (ILM) main package. + * + * The ILM entry point is {@link org.elasticsearch.xpack.ilm.IndexLifecycleService} which calls into + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner}. + * + * The {@link org.elasticsearch.xpack.ilm.IndexLifecycleService} goes through the indices that have ILM policies configured, retrieves + * the current execution {@link org.elasticsearch.xpack.core.ilm.Step.StepKey} from the index's + * {@link org.elasticsearch.xpack.core.ilm.LifecycleExecutionState} and dispatches the step execution to the appropriate + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner} method. + * This happens in: + *
    + *
  • {org.elasticsearch.xpack.ilm.IndexLifecycleService#clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)} when a master is + * elected (first election when the cluster starts up or due to the previous master having stepped down) and executes only + * {@link org.elasticsearch.xpack.core.ilm.AsyncActionStep}s + *
  • + *
  • + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#triggerPolicies(org.elasticsearch.cluster.ClusterState, boolean)} + * which serves 2 purposes: + *
      + *
    • + * Run policy steps that need to be triggered as a result of a cluster change (ie. + * {@link org.elasticsearch.xpack.core.ilm.ClusterStateActionStep} and + * {@link org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep}). This is triggered by the + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)} + * callback. + *
    • + *
    • + * Run the {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep} periodic steps. These steps are configured to run + * every {@link org.elasticsearch.xpack.core.ilm.LifecycleSettings#LIFECYCLE_POLL_INTERVAL} + *
    • + *
    + *
  • + *
+ * + * The {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner} is the component that executes the ILM steps. It has 3 entry points that + * correspond to the steps taxonomy outlined above. Namely: + *
    + *
  • + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#maybeRunAsyncAction( + * org.elasticsearch.cluster.ClusterState, + * org.elasticsearch.cluster.metadata.IndexMetadata, + * java.lang.String, org.elasticsearch.xpack.core.ilm.Step.StepKey + * )} + * handles the execution of the async steps {@link org.elasticsearch.xpack.core.ilm.AsyncActionStep}. + *
  • + *
  • + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPolicyAfterStateChange( + * java.lang.String, + * org.elasticsearch.cluster.metadata.IndexMetadata + * )} + * handles the execution of steps that wait or need to react to cluster state changes, like + * {@link org.elasticsearch.xpack.core.ilm.ClusterStateActionStep} and {@link org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep} + *
  • + *
  • + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPeriodicStep( + * java.lang.String, + * org.elasticsearch.cluster.metadata.Metadata, + * org.elasticsearch.cluster.metadata.IndexMetadata + * )} + * handles the execution of async {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep} + *
  • + *
+ * + * The policy execution can be seen as a state machine which advances through every phase's (hot/warm/cold/delete) action's + * (rollover/forcemerge/etc) steps (eg. the {@link org.elasticsearch.xpack.core.ilm.RolloverAction} comprises a series of steps that need + * to be executed. It will first check if the rollover could be executed {@link org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep} + * and then rollover the index {@link org.elasticsearch.xpack.core.ilm.RolloverStep} followed by some more house-keeping steps). + * + * The ILM runner will advance last executed state (as indicated in + * {@link org.elasticsearch.xpack.core.ilm.LifecycleExecutionState#getStep()}) and execute the next step of the index policy as + * defined in the {@link org.elasticsearch.xpack.ilm.PolicyStepsRegistry}. + * Once all the steps of a policy are executed successfully the policy execution will reach the + * {@link org.elasticsearch.xpack.core.ilm.TerminalPolicyStep} and any changes made to the policy definition will not have any effect on + * the already completed policies. Even more, any changes made to the policy HOT phase will have *no* effect on the already in-progress HOT + * phase executions (the phase JSON representation being cached into the index metadata). However, a policy update to the WARM phase will + * *have* an effect on the policies that are currently in the HOT execution state as the entire WARM phase will be reloaded from the + * policy definition when transitioning to the phase. + * + * If a step execution fails, the policy execution state for the index will be moved into the + * {@link org.elasticsearch.xpack.core.ilm.ErrorStep}. + * Currently for certain periodic steps we will automatically retry the execution of the failed step until the step executes + * successfully (see {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#onErrorMaybeRetryFailedStep}). In order to see all retryable + * steps see {@link org.elasticsearch.xpack.core.ilm.Step#isRetryable()}. + * For steps that are not retryable the failed step can manually be retried using + * {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#moveClusterStateToPreviouslyFailedStep}. + * + */ +package org.elasticsearch.xpack.ilm; + diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java index 145277e628743..d0bd1bcb448b7 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionService.java @@ -12,7 +12,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import org.elasticsearch.xpack.core.scheduler.CronSchedule; import org.elasticsearch.xpack.core.scheduler.SchedulerEngine; @@ -113,11 +112,6 @@ public void triggerRetention() { } } - @Override - public String executorName() { - return ThreadPool.Names.SAME; - } - @Override public void close() { if (this.running.compareAndSet(true, false)) { diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java index c4d3192eeb65a..a9da468279062 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java @@ -437,7 +437,7 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) { if (useOnMaster) { when(clusterService.state()).thenReturn(currentState); - indexLifecycleService.onMaster(); + indexLifecycleService.onMaster(currentState); } else { indexLifecycleService.triggerPolicies(currentState, randomBoolean()); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 2859bd0c6f732..be4f73a9579b2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -11,7 +11,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; @@ -22,7 +21,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -class MlInitializationService implements LocalNodeMasterListener, ClusterStateListener { +class MlInitializationService implements ClusterStateListener { private static final Logger logger = LogManager.getLogger(MlInitializationService.class); @@ -31,6 +30,8 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi private final MlDailyMaintenanceService mlDailyMaintenanceService; + private boolean isMaster = false; + MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, MlAssignmentNotifier mlAssignmentNotifier) { this(client, @@ -50,7 +51,6 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi this.client = Objects.requireNonNull(client); this.mlDailyMaintenanceService = dailyMaintenanceService; clusterService.addListener(this); - clusterService.addLocalNodeMasterListener(this); clusterService.addLifecycleListener(new LifecycleListener() { @Override public void afterStart() { @@ -67,19 +67,26 @@ public void beforeStop() { }); } - - @Override public void onMaster() { mlDailyMaintenanceService.start(); } - @Override public void offMaster() { mlDailyMaintenanceService.stop(); } @Override public void clusterChanged(ClusterChangedEvent event) { + final boolean prevIsMaster = this.isMaster; + if (prevIsMaster != event.localNodeMaster()) { + this.isMaster = event.localNodeMaster(); + if (this.isMaster) { + onMaster(); + } else { + offMaster(); + } + } + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { // Wait until the gateway has recovered from disk. return; @@ -87,7 +94,7 @@ public void clusterChanged(ClusterChangedEvent event) { // The atomic flag prevents multiple simultaneous attempts to create the // index if there is a flurry of cluster state updates in quick succession - if (event.localNodeMaster() && isIndexCreationInProgress.compareAndSet(false, true)) { + if (this.isMaster && isIndexCreationInProgress.compareAndSet(false, true)) { AnnotationIndex.createAnnotationsIndexIfNecessary(client, event.state(), ActionListener.wrap( r -> { isIndexCreationInProgress.set(false); @@ -102,11 +109,6 @@ public void clusterChanged(ClusterChangedEvent event) { } } - @Override - public String executorName() { - return ThreadPool.Names.GENERIC; - } - /** For testing */ MlDailyMaintenanceService getDailyMaintenanceService() { return mlDailyMaintenanceService; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java index bfb4661231943..50d237950886a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/MlMemoryTracker.java @@ -131,11 +131,6 @@ public void stop() { logger.debug("ML memory tracker stopped"); } - @Override - public String executorName() { - return MachineLearning.UTILITY_THREAD_POOL_NAME; - } - /** * Is the information in this object sufficiently up to date * for valid task assignment decisions to be made using it? @@ -222,7 +217,7 @@ public boolean asyncRefresh() { aVoid -> logger.trace("Job memory requirement refresh request completed successfully"), e -> logger.warn("Failed to refresh job memory requirements", e) ); - threadPool.executor(executorName()).execute( + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( () -> refresh(clusterService.state().getMetadata().custom(PersistentTasksCustomMetadata.TYPE), listener)); return true; } catch (EsRejectedExecutionException e) { @@ -339,7 +334,8 @@ private void iterateAnomalyDetectorJobTasks(Iterator threadPool.executor(executorName()).execute(() -> iterateAnomalyDetectorJobTasks(iterator, refreshComplete)), + mem -> threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME) + .execute(() -> iterateAnomalyDetectorJobTasks(iterator, refreshComplete)), refreshComplete::onFailure)); } else { refreshComplete.onResponse(null);