Skip to content

Commit

Permalink
Thread safe clean up of LocalNodeModeListeners (#60007)
Browse files Browse the repository at this point in the history
This commit continues on the work in #59801 and makes other
implementors of the LocalNodeMasterListener interface thread safe in
that they will no longer allow the callbacks to run on different
threads and possibly race each other. This also helps address other
issues where these events could be queued to wait for execution while
the service keeps moving forward thinking it is the master even when
that is not the case.

In order to accomplish this, the LocalNodeMasterListener no longer has
the executorName() method to prevent future uses that could encounter
this surprising behavior.

Each use was inspected and if the class was also a
ClusterStateListener, the implementation of LocalNodeMasterListener
was removed in favor of a single listener that combined the logic. A
single listener is used and there is currently no guarantee on execution
order between ClusterStateListeners and LocalNodeMasterListeners,
so a future change there could cause undesired consequences. For other
classes, the implementations of the callbacks were inspected and if the
operations were lightweight, the overriden executorName method was
removed to use the default, which runs on the same thread.

Backport of #59932
  • Loading branch information
jaymode authored Jul 22, 2020
1 parent 702c997 commit c8ef2e1
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
this::setEnabled);

// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(this);
}

private void setEnabled(boolean enabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,5 @@ public interface LocalNodeMasterListener {
* Called when the local node used to be the master, a new master was elected and it's no longer the local node.
*/
void offMaster();

/**
* The name of the executor that the implementation of the callbacks of this lister should be executed on. The thread
* that is responsible for managing instances of this lister is the same thread handling the cluster state events. If
* the work done is the callbacks above is inexpensive, this value may be
* {@link org.elasticsearch.threadpool.ThreadPool.Names#SAME SAME} (indicating that the callbacks will run on the same thread
* as the cluster state events are fired with). On the other hand, if the logic in the callbacks are heavier and take
* longer to process (or perhaps involve blocking due to IO operations), prefer to execute them on a separate more appropriate
* executor (eg. {@link org.elasticsearch.threadpool.ThreadPool.Names#GENERIC GENERIC}
* or {@link org.elasticsearch.threadpool.ThreadPool.Names#MANAGEMENT MANAGEMENT}).
*
* @return The name of the executor that will run the callbacks of this listener.
*/
String executorName();

}

Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
this.localNodeMasterListeners = new LocalNodeMasterListeners(threadPool);
this.localNodeMasterListeners = new LocalNodeMasterListeners();
this.nodeName = nodeName;

this.slowTaskLoggingThreshold = CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(settings);
Expand Down Expand Up @@ -611,29 +611,30 @@ public void run() {
private static class LocalNodeMasterListeners implements ClusterStateListener {

private final List<LocalNodeMasterListener> listeners = new CopyOnWriteArrayList<>();
private final ThreadPool threadPool;
private volatile boolean master = false;

private LocalNodeMasterListeners(ThreadPool threadPool) {
this.threadPool = threadPool;
private LocalNodeMasterListeners() {
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (!master && event.localNodeMaster()) {
master = true;
for (LocalNodeMasterListener listener : listeners) {
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
executor.execute(new OnMasterRunnable(listener));
try {
listener.onMaster();
} catch (Exception e) {
logger.warn("failed to notify LocalNodeMasterListener", e);
}
}
return;
}

if (master && !event.localNodeMaster()) {
} else if (master && !event.localNodeMaster()) {
master = false;
for (LocalNodeMasterListener listener : listeners) {
java.util.concurrent.Executor executor = threadPool.executor(listener.executorName());
executor.execute(new OffMasterRunnable(listener));
try {
listener.offMaster();
} catch (Exception e) {
logger.warn("failed to notify LocalNodeMasterListener", e);
}
}
}
}
Expand All @@ -644,34 +645,6 @@ private void add(LocalNodeMasterListener listener) {

}

private static class OnMasterRunnable implements Runnable {

private final LocalNodeMasterListener listener;

private OnMasterRunnable(LocalNodeMasterListener listener) {
this.listener = listener;
}

@Override
public void run() {
listener.onMaster();
}
}

private static class OffMasterRunnable implements Runnable {

private final LocalNodeMasterListener listener;

private OffMasterRunnable(LocalNodeMasterListener listener) {
this.listener = listener;
}

@Override
public void run() {
listener.offMaster();
}
}

// this one is overridden in tests so we can control time
protected long currentTimeInMillis() {
return threadPool.relativeTimeInMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.threadpool.ThreadPool;

import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
Expand All @@ -45,10 +47,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import javax.crypto.SecretKey;
import javax.crypto.SecretKeyFactory;
import javax.crypto.spec.PBEKeySpec;

/**
* Used to publish secure setting hashes in the cluster state and to validate those hashes against the local values of those same settings.
* This is colloquially referred to as the secure setting consistency check. It will publish and verify hashes only for the collection
Expand Down Expand Up @@ -247,11 +245,6 @@ public void onFailure(String source, Exception e) {
public void offMaster() {
logger.trace("I am no longer master, nothing to do");
}

@Override
public String executorName() {
return ThreadPool.Names.SAME;
}
}

}
5 changes: 4 additions & 1 deletion server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,10 @@ private List<NetworkService.CustomNameResolver> getCustomNameResolvers(List<Disc
/** Constructs a ClusterInfoService which may be mocked for tests. */
protected ClusterInfoService newClusterInfoService(Settings settings, ClusterService clusterService,
ThreadPool threadPool, NodeClient client) {
return new InternalClusterInfoService(settings, clusterService, threadPool, client);
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(service);
return service;
}

/** Constructs a {@link org.elasticsearch.http.HttpServerTransport} which may be mocked for tests. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {

final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
clusterService.addListener(clusterInfoService);
clusterInfoService.addListener(ignored -> {});

clusterService.setNodeConnectionsService(ClusterServiceUtils.createNoOpNodeConnectionsService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,6 @@ public void onMaster() {
public void offMaster() {
isMaster.set(false);
}

@Override
public String executorName() {
return ThreadPool.Names.SAME;
}
});

ClusterState state = timedClusterApplierService.state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ protected ClusterInfoService newClusterInfoService(Settings settings, ClusterSer
if (getPluginsService().filterPlugins(MockInternalClusterInfoService.TestPlugin.class).isEmpty()) {
return super.newClusterInfoService(settings, clusterService, threadPool, client);
} else {
return new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
final MockInternalClusterInfoService service = new MockInternalClusterInfoService(settings, clusterService, threadPool, client);
clusterService.addListener(service);
return service;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,6 @@ public void offMaster() {
}
}

@Override
public String executorName() {
return ThreadPool.Names.GENERIC;
}

private void scheduleNext() {
if (isMaster) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -51,7 +50,7 @@
* A service which runs the {@link LifecyclePolicy}s associated with indexes.
*/
public class IndexLifecycleService
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, LocalNodeMasterListener, IndexEventListener {
implements ClusterStateListener, ClusterStateApplier, SchedulerEngine.Listener, Closeable, IndexEventListener {
private static final Logger logger = LogManager.getLogger(IndexLifecycleService.class);
private static final Set<String> IGNORE_STEPS_MAINTENANCE_REQUESTED = Collections.singleton(ShrinkStep.NAME);
private volatile boolean isMaster = false;
Expand Down Expand Up @@ -82,7 +81,6 @@ public IndexLifecycleService(Settings settings, Client client, ClusterService cl
this.pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING.get(settings);
clusterService.addStateApplier(this);
clusterService.addListener(this);
clusterService.addLocalNodeMasterListener(this);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING,
this::updatePollInterval);
}
Expand Down Expand Up @@ -121,13 +119,11 @@ public ClusterState moveClusterStateToPreviouslyFailedStep(ClusterState currentS
return newState;
}

@Override
public void onMaster() {
this.isMaster = true;
// package private for testing
void onMaster(ClusterState clusterState) {
maybeScheduleJob();

ClusterState clusterState = clusterService.state();
IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE);
final IndexLifecycleMetadata currentMetadata = clusterState.metadata().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null) {
OperationMode currentMode = currentMetadata.getOperationMode();
if (OperationMode.STOPPED.equals(currentMode)) {
Expand Down Expand Up @@ -184,17 +180,6 @@ public void onMaster() {
}
}

@Override
public void offMaster() {
this.isMaster = false;
cancelJob();
}

@Override
public String executorName() {
return ThreadPool.Names.MANAGEMENT;
}

@Override
public void beforeIndexAddedToCluster(Index index, Settings indexSettings) {
if (shouldParseIndexName(indexSettings)) {
Expand Down Expand Up @@ -237,7 +222,20 @@ private synchronized void maybeScheduleJob() {

@Override
public void clusterChanged(ClusterChangedEvent event) {
IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
final boolean prevIsMaster = this.isMaster;
if (prevIsMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster();
if (this.isMaster) {
onMaster(event.state());
} else {
cancelJob();
}
}

final IndexLifecycleMetadata lifecycleMetadata = event.state().metadata().custom(IndexLifecycleMetadata.TYPE);
if (this.isMaster && lifecycleMetadata != null) {
triggerPolicies(event.state(), true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

/**
* This is the Index Lifecycle Management (ILM) main package.
*
* The ILM entry point is {@link org.elasticsearch.xpack.ilm.IndexLifecycleService} which calls into
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner}.
*
* The {@link org.elasticsearch.xpack.ilm.IndexLifecycleService} goes through the indices that have ILM policies configured, retrieves
* the current execution {@link org.elasticsearch.xpack.core.ilm.Step.StepKey} from the index's
* {@link org.elasticsearch.xpack.core.ilm.LifecycleExecutionState} and dispatches the step execution to the appropriate
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner} method.
* This happens in:
* <ul>
* <li>{org.elasticsearch.xpack.ilm.IndexLifecycleService#clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)} when a master is
* elected (first election when the cluster starts up or due to the previous master having stepped down) and executes only
* {@link org.elasticsearch.xpack.core.ilm.AsyncActionStep}s
* </li>
* <li>
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#triggerPolicies(org.elasticsearch.cluster.ClusterState, boolean)}
* which serves 2 purposes:
* <ul>
* <li>
* Run policy steps that need to be triggered as a result of a cluster change (ie.
* {@link org.elasticsearch.xpack.core.ilm.ClusterStateActionStep} and
* {@link org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep}). This is triggered by the
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#clusterChanged(org.elasticsearch.cluster.ClusterChangedEvent)}
* callback.
* </li>
* <li>
* Run the {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep} periodic steps. These steps are configured to run
* every {@link org.elasticsearch.xpack.core.ilm.LifecycleSettings#LIFECYCLE_POLL_INTERVAL}
* </li>
* </ul>
* </li>
* </ul>
*
* The {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner} is the component that executes the ILM steps. It has 3 entry points that
* correspond to the steps taxonomy outlined above. Namely:
* <ul>
* <li>
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#maybeRunAsyncAction(
* org.elasticsearch.cluster.ClusterState,
* org.elasticsearch.cluster.metadata.IndexMetadata,
* java.lang.String, org.elasticsearch.xpack.core.ilm.Step.StepKey
* )}
* handles the execution of the async steps {@link org.elasticsearch.xpack.core.ilm.AsyncActionStep}.
* </li>
* <li>
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPolicyAfterStateChange(
* java.lang.String,
* org.elasticsearch.cluster.metadata.IndexMetadata
* )}
* handles the execution of steps that wait or need to react to cluster state changes, like
* {@link org.elasticsearch.xpack.core.ilm.ClusterStateActionStep} and {@link org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep}
* </li>
* <li>
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#runPeriodicStep(
* java.lang.String,
* org.elasticsearch.cluster.metadata.Metadata,
* org.elasticsearch.cluster.metadata.IndexMetadata
* )}
* handles the execution of async {@link org.elasticsearch.xpack.core.ilm.AsyncWaitStep}
* </li>
* </ul>
*
* The policy execution can be seen as a state machine which advances through every phase's (hot/warm/cold/delete) action's
* (rollover/forcemerge/etc) steps (eg. the {@link org.elasticsearch.xpack.core.ilm.RolloverAction} comprises a series of steps that need
* to be executed. It will first check if the rollover could be executed {@link org.elasticsearch.xpack.core.ilm.WaitForRolloverReadyStep}
* and then rollover the index {@link org.elasticsearch.xpack.core.ilm.RolloverStep} followed by some more house-keeping steps).
*
* The ILM runner will advance last executed state (as indicated in
* {@link org.elasticsearch.xpack.core.ilm.LifecycleExecutionState#getStep()}) and execute the next step of the index policy as
* defined in the {@link org.elasticsearch.xpack.ilm.PolicyStepsRegistry}.
* Once all the steps of a policy are executed successfully the policy execution will reach the
* {@link org.elasticsearch.xpack.core.ilm.TerminalPolicyStep} and any changes made to the policy definition will not have any effect on
* the already completed policies. Even more, any changes made to the policy HOT phase will have *no* effect on the already in-progress HOT
* phase executions (the phase JSON representation being cached into the index metadata). However, a policy update to the WARM phase will
* *have* an effect on the policies that are currently in the HOT execution state as the entire WARM phase will be reloaded from the
* policy definition when transitioning to the phase.
*
* If a step execution fails, the policy execution state for the index will be moved into the
* {@link org.elasticsearch.xpack.core.ilm.ErrorStep}.
* Currently for certain periodic steps we will automatically retry the execution of the failed step until the step executes
* successfully (see {@link org.elasticsearch.xpack.ilm.IndexLifecycleRunner#onErrorMaybeRetryFailedStep}). In order to see all retryable
* steps see {@link org.elasticsearch.xpack.core.ilm.Step#isRetryable()}.
* For steps that are not retryable the failed step can manually be retried using
* {@link org.elasticsearch.xpack.ilm.IndexLifecycleService#moveClusterStateToPreviouslyFailedStep}.
*
*/
package org.elasticsearch.xpack.ilm;

Loading

0 comments on commit c8ef2e1

Please sign in to comment.