Skip to content

Write CS asynchronously on data-only nodes #50782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 162 additions & 4 deletions server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -37,21 +40,32 @@
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeMetaData;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;

/**
* Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
*
Expand Down Expand Up @@ -100,16 +114,20 @@ public void start(Settings settings, TransportService transportService, ClusterS
}

final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter();
final LucenePersistedState lucenePersistedState;
final PersistedState persistedState;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.version(lastAcceptedVersion)
.metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader))
.build());
lucenePersistedState = new LucenePersistedState(
persistenceWriter, currentTerm, clusterState);
if (DiscoveryNode.isMasterNode(settings)) {
persistedState = new LucenePersistedState(persistenceWriter, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(),
new LucenePersistedState(persistenceWriter, currentTerm, clusterState));
}
if (DiscoveryNode.isDataNode(settings)) {
metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality)
} else {
Expand All @@ -125,7 +143,7 @@ public void start(Settings settings, TransportService transportService, ClusterS
}
}

persistedState.set(lucenePersistedState);
this.persistedState.set(persistedState);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}
Expand Down Expand Up @@ -227,6 +245,146 @@ public void close() throws IOException {
IOUtils.close(persistedState.get());
}

// visible for testing
public boolean allPendingAsyncStatesWritten() {
final PersistedState ps = persistedState.get();
if (ps instanceof AsyncLucenePersistedState) {
return ((AsyncLucenePersistedState) ps).allPendingAsyncStatesWritten();
} else {
return true;
}
}

static class AsyncLucenePersistedState extends InMemoryPersistedState {

private static final Logger logger = LogManager.getLogger(AsyncLucenePersistedState.class);

static final String THREAD_NAME = "AsyncLucenePersistedState#updateTask";

private final EsThreadPoolExecutor threadPoolExecutor;
private final PersistedState persistedState;

boolean newCurrentTermQueued = false;
boolean newStateQueued = false;

private final Object mutex = new Object();

AsyncLucenePersistedState(Settings settings, ThreadPool threadPool, PersistedState persistedState) {
super(persistedState.getCurrentTerm(), persistedState.getLastAcceptedState());
final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
threadPoolExecutor = EsExecutors.newFixed(
nodeName + "/" + THREAD_NAME,
1, 1,
daemonThreadFactory(nodeName, THREAD_NAME),
threadPool.getThreadContext());
this.persistedState = persistedState;
}

@Override
public void setCurrentTerm(long currentTerm) {
synchronized (mutex) {
super.setCurrentTerm(currentTerm);
if (newCurrentTermQueued) {
logger.trace("term update already queued (setting term to {})", currentTerm);
} else {
logger.trace("queuing term update (setting term to {})", currentTerm);
newCurrentTermQueued = true;
scheduleUpdate();
}
}
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
synchronized (mutex) {
super.setLastAcceptedState(clusterState);
if (newStateQueued) {
logger.trace("cluster state update already queued (setting cluster state to {})", clusterState.version());
} else {
logger.trace("queuing cluster state update (setting cluster state to {})", clusterState.version());
newStateQueued = true;
scheduleUpdate();
}
}
}

private void scheduleUpdate() {
assert Thread.holdsLock(mutex);
try {
threadPoolExecutor.execute(new AbstractRunnable() {

@Override
public void onFailure(Exception e) {
logger.error("Exception occurred when storing new meta data", e);
}

@Override
protected void doRun() {
final Long term;
final ClusterState clusterState;
synchronized (mutex) {
if (newCurrentTermQueued) {
term = getCurrentTerm();
newCurrentTermQueued = false;
} else {
term = null;
}
if (newStateQueued) {
clusterState = getLastAcceptedState();
newStateQueued = false;
} else {
clusterState = null;
}
}
// write current term before last accepted state so that it is never below term in last accepted state
if (term != null) {
persistedState.setCurrentTerm(term);
}
if (clusterState != null) {
persistedState.setLastAcceptedState(resetVotingConfiguration(clusterState));
}
}
});
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting to be done here...
if (threadPoolExecutor.isShutdown() == false) {
assert false : "only expect rejections when shutting down";
throw e;
}
}
}

static final CoordinationMetaData.VotingConfiguration staleStateConfiguration =
new CoordinationMetaData.VotingConfiguration(Collections.singleton("STALE_STATE_CONFIG"));

static ClusterState resetVotingConfiguration(ClusterState clusterState) {
CoordinationMetaData newCoordinationMetaData = CoordinationMetaData.builder(clusterState.coordinationMetaData())
.lastAcceptedConfiguration(staleStateConfiguration)
.lastCommittedConfiguration(staleStateConfiguration)
.build();
return ClusterState.builder(clusterState).metaData(MetaData.builder(clusterState.metaData())
.coordinationMetaData(newCoordinationMetaData).build()).build();
}

@Override
public void close() throws IOException {
try {
ThreadPool.terminate(threadPoolExecutor, 10, TimeUnit.SECONDS);
} finally {
persistedState.close();
}
}

boolean allPendingAsyncStatesWritten() {
synchronized (mutex) {
if (newCurrentTermQueued || newStateQueued) {
return false;
}
return threadPoolExecutor.getActiveCount() == 0;
}
}
}

/**
* Encapsulates the incremental writing of metadata to a {@link PersistedClusterStateService.Writer}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.Node;
Expand Down Expand Up @@ -259,6 +260,7 @@ public void test3MasterNodes2Failed() throws Exception {
logger.info("--> stop 1st master-eligible node and data-only node");
NodeEnvironment nodeEnvironment = internalCluster().getMasterNodeInstance(NodeEnvironment.class);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(masterNodes.get(0)));
assertBusy(() -> internalCluster().getInstance(GatewayMetaState.class, dataNode).allPendingAsyncStatesWritten());
internalCluster().stopRandomDataNode();

logger.info("--> unsafely-bootstrap 1st master-eligible node");
Expand Down Expand Up @@ -327,6 +329,7 @@ public void testAllMasterEligibleNodesFailedDanglingIndexImport() throws Excepti

logger.info("--> stop data-only node and detach it from the old cluster");
Settings dataNodeDataPathSettings = internalCluster().dataPathSettings(dataNode);
assertBusy(() -> internalCluster().getInstance(GatewayMetaState.class, dataNode).allPendingAsyncStatesWritten());
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNode));
final Environment environment = TestEnvironment.newEnvironment(
Settings.builder().put(internalCluster().getDefaultSettings()).put(dataNodeDataPathSettings).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,31 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class GatewayMetaStatePersistedStateTests extends ESTestCase {
private NodeEnvironment nodeEnvironment;
Expand Down Expand Up @@ -309,4 +318,95 @@ public void testStatePersistedOnLoad() throws IOException {
}
}

public void testDataOnlyNodePersistence() throws Exception {
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
when(transportService.getThreadPool()).thenReturn(threadPool);
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE);
gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));

//generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration
CoordinationMetaData coordinationMetaData;
do {
coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong());
} while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration()));

ClusterState state = createClusterState(randomNonNegativeLong(),
MetaData.builder().coordinationMetaData(coordinationMetaData)
.clusterUUID(randomAlphaOfLength(10)).build());
persistedState.setLastAcceptedState(state);
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetaData persistedCoordinationMetaData =
persistedClusterStateService.loadBestOnDiskState().metaData.coordinationMetaData();
assertThat(persistedCoordinationMetaData.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetaData.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));

persistedState.markLastAcceptedStateAsCommitted();
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData)
.lastCommittedConfiguration(coordinationMetaData.getLastAcceptedConfiguration()).build();
ClusterState expectedClusterState =
ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData)
.clusterUUID(state.metaData().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetaData = persistedClusterStateService.loadBestOnDiskState().metaData.coordinationMetaData();
assertThat(persistedCoordinationMetaData.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetaData.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metaData.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
long currentTerm = state.term();
for (int i = 0; i < 1000; i++) {
if (rarely()) {
// bump term
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
persistedState.setCurrentTerm(currentTerm);
} else {
// update cluster state
final int numberOfShards = randomIntBetween(1, 5);
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
final IndexMetaData indexMetaData = createIndexMetaData(indexName, numberOfShards, i);
state = createClusterState(state.version() + 1,
MetaData.builder().coordinationMetaData(createCoordinationMetaData(term)).put(indexMetaData, false).build());
persistedState.setLastAcceptedState(state);
}
}
assertEquals(currentTerm, persistedState.getCurrentTerm());
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));

gateway.close();

try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
reloadedPersistedState.getLastAcceptedState());
assertNotNull(reloadedPersistedState.getLastAcceptedState().metaData().index(indexName));
}

ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

}
Loading