Skip to content

Make cluster state writer resilient to disk issues #50805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -59,6 +61,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -113,8 +116,7 @@ public void start(Settings settings, TransportService transportService, ClusterS
}
}

final PersistedClusterStateService.Writer persistenceWriter = persistedClusterStateService.createWriter();
final PersistedState persistedState;
PersistedState persistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(transportService, clusterService,
Expand All @@ -123,10 +125,10 @@ public void start(Settings settings, TransportService transportService, ClusterS
.metaData(upgradeMetaDataForNode(metaData, metaDataIndexUpgradeService, metaDataUpgrader))
.build());
if (DiscoveryNode.isMasterNode(settings)) {
persistedState = new LucenePersistedState(persistenceWriter, currentTerm, clusterState);
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(settings, transportService.getThreadPool(),
new LucenePersistedState(persistenceWriter, currentTerm, clusterState));
new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState));
}
if (DiscoveryNode.isDataNode(settings)) {
metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality)
Expand All @@ -139,7 +141,7 @@ public void start(Settings settings, TransportService transportService, ClusterS
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(persistenceWriter);
IOUtils.closeWhileHandlingException(persistedState);
}
}

Expand Down Expand Up @@ -392,11 +394,15 @@ static class LucenePersistedState implements PersistedState {

private long currentTerm;
private ClusterState lastAcceptedState;
private final PersistedClusterStateService.Writer persistenceWriter;
private final PersistedClusterStateService persistedClusterStateService;

LucenePersistedState(PersistedClusterStateService.Writer persistenceWriter, long currentTerm, ClusterState lastAcceptedState)
// As the close method can be concurrently called to the other PersistedState methods, this class has extra protection in place.
private final AtomicReference<PersistedClusterStateService.Writer> persistenceWriter = new AtomicReference<>();
boolean writeNextStateFully;

LucenePersistedState(PersistedClusterStateService persistedClusterStateService, long currentTerm, ClusterState lastAcceptedState)
throws IOException {
this.persistenceWriter = persistenceWriter;
this.persistedClusterStateService = persistedClusterStateService;
this.currentTerm = currentTerm;
this.lastAcceptedState = lastAcceptedState;
// Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that
Expand All @@ -406,7 +412,18 @@ static class LucenePersistedState implements PersistedState {
// In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance,
// this is true if there's only one data path on this master node, and the commit we just loaded was already written out
// by this version of Elasticsearch. TODO TBD should we avoid indexing when possible?
persistenceWriter.writeFullStateAndCommit(currentTerm, lastAcceptedState);
final PersistedClusterStateService.Writer writer = persistedClusterStateService.createWriter();
try {
writer.writeFullStateAndCommit(currentTerm, lastAcceptedState);
} catch (Exception e) {
try {
writer.close();
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw e;
}
persistenceWriter.set(writer);
}

@Override
Expand All @@ -421,32 +438,74 @@ public ClusterState getLastAcceptedState() {

@Override
public void setCurrentTerm(long currentTerm) {
persistenceWriter.commit(currentTerm, lastAcceptedState.version());
try {
if (writeNextStateFully) {
getWriterSafe().writeFullStateAndCommit(currentTerm, lastAcceptedState);
writeNextStateFully = false;
} else {
getWriterSafe().commit(currentTerm, lastAcceptedState.version());
}
} catch (Exception e) {
handleExceptionOnWrite(e);
}
this.currentTerm = currentTerm;
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (clusterState.term() != lastAcceptedState.term()) {
assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
// In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so
// it's simplest to write everything again.
persistenceWriter.writeFullStateAndCommit(currentTerm, clusterState);
if (writeNextStateFully) {
getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
writeNextStateFully = false;
} else {
// Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing.
persistenceWriter.writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
if (clusterState.term() != lastAcceptedState.term()) {
assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term();
// In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state,
// so it's simplest to write everything again.
getWriterSafe().writeFullStateAndCommit(currentTerm, clusterState);
} else {
// Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing.
getWriterSafe().writeIncrementalStateAndCommit(currentTerm, lastAcceptedState, clusterState);
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
handleExceptionOnWrite(e);
}

lastAcceptedState = clusterState;
}

private PersistedClusterStateService.Writer getWriterSafe() {
final PersistedClusterStateService.Writer writer = persistenceWriter.get();
if (writer == null) {
throw new AlreadyClosedException("persisted state has been closed");
}
if (writer.isOpen()) {
return writer;
} else {
try {
final PersistedClusterStateService.Writer newWriter = persistedClusterStateService.createWriter();
if (persistenceWriter.compareAndSet(writer, newWriter)) {
return newWriter;
} else {
assert persistenceWriter.get() == null : "expected no concurrent calls to getWriterSafe";
newWriter.close();
throw new AlreadyClosedException("persisted state has been closed");
}
} catch (Exception e) {
throw ExceptionsHelper.convertToRuntime(e);
}
}
}

private void handleExceptionOnWrite(Exception e) {
writeNextStateFully = true;
throw ExceptionsHelper.convertToRuntime(e);
}

@Override
public void close() throws IOException {
persistenceWriter.close();
IOUtils.close(persistenceWriter.getAndSet(null));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Bits;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntPredicate;

/**
Expand Down Expand Up @@ -497,13 +499,17 @@ void flush() throws IOException {
this.indexWriter.flush();
}

void commit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException {
void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException {
final Map<String, String> commitData = new HashMap<>(COMMIT_DATA_SIZE);
commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm));
commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion));
commitData.put(NODE_VERSION_KEY, Integer.toString(Version.CURRENT.id));
commitData.put(NODE_ID_KEY, nodeId);
indexWriter.setLiveCommitData(commitData.entrySet());
indexWriter.prepareCommit();
}

void commit() throws IOException {
indexWriter.commit();
}

Expand All @@ -520,30 +526,62 @@ public static class Writer implements Closeable {
private final BigArrays bigArrays;

boolean fullStateWritten = false;
private final AtomicBoolean closed = new AtomicBoolean();

private Writer(List<MetaDataIndexWriter> metaDataIndexWriters, String nodeId, BigArrays bigArrays) {
this.metaDataIndexWriters = metaDataIndexWriters;
this.nodeId = nodeId;
this.bigArrays = bigArrays;
}

private void ensureOpen() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised we need to check this, given that the caller has already checked we're open in reloadWriterIfNecessary(). Can you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was more separation of concerns. I did not want Writer here to rely on correct usage by the caller, but enforce it as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should be asserting that it's open instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I remembered. We can't make this an assertion as it breaks due to concurrency between close() and the other methods.

if (closed.get()) {
throw new AlreadyClosedException("cluster state writer is closed already");
}
}

public boolean isOpen() {
return closed.get() == false;
}

private void closeIfAnyIndexWriterHasTragedyOrIsClosed() {
if (metaDataIndexWriters.stream().map(writer -> writer.indexWriter)
.anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) {
try {
close();
} catch (Exception e) {
logger.warn("failed on closing cluster state writer", e);
}
}
}

/**
* Overrides and commits the given current term and cluster state
*/
public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException {
overwriteMetaData(clusterState.metaData());
commit(currentTerm, clusterState.version());
fullStateWritten = true;
ensureOpen();
try {
overwriteMetaData(clusterState.metaData());
commit(currentTerm, clusterState.version());
fullStateWritten = true;
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
}

/**
* Updates and commits the given cluster state update
*/
void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState,
ClusterState clusterState) throws IOException {
ensureOpen();
assert fullStateWritten : "Need to write full state first before doing incremental writes";
updateMetaData(previousClusterState.metaData(), clusterState.metaData());
commit(currentTerm, clusterState.version());
try {
updateMetaData(previousClusterState.metaData(), clusterState.metaData());
commit(currentTerm, clusterState.version());
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
}

/**
Expand Down Expand Up @@ -634,23 +672,48 @@ private void addMetaData(MetaData metaData) throws IOException {
}
}

public void commit(long currentTerm, long lastAcceptedVersion) {
public void commit(long currentTerm, long lastAcceptedVersion) throws IOException {
ensureOpen();
try {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion);
metaDataIndexWriter.prepareCommit(nodeId, currentTerm, lastAcceptedVersion);
}
} catch (Exception e) {
try {
close();
} catch (Exception e2) {
logger.warn("failed on closing cluster state writer", e2);
e.addSuppressed(e2);
}
throw e;
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
try {
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) {
metaDataIndexWriter.commit();
}
} catch (IOException e) {
// The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the
// data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and
// retry from the beginning.
try {
close();
} catch (Exception e2) {
e.addSuppressed(e2);
}
throw new IOError(e);
} finally {
closeIfAnyIndexWriterHasTragedyOrIsClosed();
}
}

@Override
public void close() throws IOException {
logger.trace("closing");
IOUtils.close(metaDataIndexWriters);
logger.trace("closing PersistedClusterStateService.Writer");
if (closed.compareAndSet(false, true)) {
IOUtils.close(metaDataIndexWriters);
}
}

private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException {
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,6 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(() -> stopWatch.stop().start("gateway_meta_state"));
toClose.add(injector.getInstance(GatewayMetaState.class));

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand All @@ -882,8 +880,11 @@ public synchronized void close() throws IOException {
// Don't call shutdownNow here, it might break ongoing operations on Lucene indices.
// See https://issues.apache.org/jira/browse/LUCENE-7248. We call shutdownNow in
// awaitClose if the node doesn't finish closing within the specified time.
toClose.add(() -> stopWatch.stop().start("node_environment"));

toClose.add(() -> stopWatch.stop().start("gateway_meta_state"));
toClose.add(injector.getInstance(GatewayMetaState.class));

toClose.add(() -> stopWatch.stop().start("node_environment"));
toClose.add(injector.getInstance(NodeEnvironment.class));
toClose.add(stopWatch::stop);

Expand Down
Loading