-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Make cluster state writer resilient to disk issues #50805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
75ec3ee
1c7eb4c
333f42a
7082947
069730c
8a9c864
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
import org.apache.lucene.search.Scorer; | ||
import org.apache.lucene.search.TermQuery; | ||
import org.apache.lucene.search.Weight; | ||
import org.apache.lucene.store.AlreadyClosedException; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.SimpleFSDirectory; | ||
import org.apache.lucene.util.Bits; | ||
|
@@ -80,6 +81,7 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.function.IntPredicate; | ||
|
||
/** | ||
|
@@ -497,13 +499,17 @@ void flush() throws IOException { | |
this.indexWriter.flush(); | ||
} | ||
|
||
void commit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { | ||
void prepareCommit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { | ||
final Map<String, String> commitData = new HashMap<>(COMMIT_DATA_SIZE); | ||
commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); | ||
commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); | ||
commitData.put(NODE_VERSION_KEY, Integer.toString(Version.CURRENT.id)); | ||
commitData.put(NODE_ID_KEY, nodeId); | ||
indexWriter.setLiveCommitData(commitData.entrySet()); | ||
indexWriter.prepareCommit(); | ||
} | ||
|
||
void commit() throws IOException { | ||
indexWriter.commit(); | ||
} | ||
|
||
|
@@ -520,30 +526,62 @@ public static class Writer implements Closeable { | |
private final BigArrays bigArrays; | ||
|
||
boolean fullStateWritten = false; | ||
private final AtomicBoolean closed = new AtomicBoolean(); | ||
|
||
private Writer(List<MetaDataIndexWriter> metaDataIndexWriters, String nodeId, BigArrays bigArrays) { | ||
this.metaDataIndexWriters = metaDataIndexWriters; | ||
this.nodeId = nodeId; | ||
this.bigArrays = bigArrays; | ||
} | ||
|
||
private void ensureOpen() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm surprised we need to check this, given that the caller has already checked we're open in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was more separation of concerns. I did not want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should be asserting that it's open instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I remembered. We can't make this an assertion as it breaks due to concurrency between close() and the other methods. |
||
if (closed.get()) { | ||
throw new AlreadyClosedException("cluster state writer is closed already"); | ||
} | ||
} | ||
|
||
public boolean isOpen() { | ||
return closed.get() == false; | ||
} | ||
|
||
private void closeIfAnyIndexWriterHasTragedyOrIsClosed() { | ||
if (metaDataIndexWriters.stream().map(writer -> writer.indexWriter) | ||
.anyMatch(iw -> iw.getTragicException() != null || iw.isOpen() == false)) { | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try { | ||
close(); | ||
} catch (Exception e) { | ||
logger.warn("failed on closing cluster state writer", e); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Overrides and commits the given current term and cluster state | ||
*/ | ||
public void writeFullStateAndCommit(long currentTerm, ClusterState clusterState) throws IOException { | ||
overwriteMetaData(clusterState.metaData()); | ||
commit(currentTerm, clusterState.version()); | ||
fullStateWritten = true; | ||
ensureOpen(); | ||
try { | ||
overwriteMetaData(clusterState.metaData()); | ||
commit(currentTerm, clusterState.version()); | ||
fullStateWritten = true; | ||
} finally { | ||
closeIfAnyIndexWriterHasTragedyOrIsClosed(); | ||
} | ||
} | ||
|
||
/** | ||
* Updates and commits the given cluster state update | ||
*/ | ||
void writeIncrementalStateAndCommit(long currentTerm, ClusterState previousClusterState, | ||
ClusterState clusterState) throws IOException { | ||
ensureOpen(); | ||
assert fullStateWritten : "Need to write full state first before doing incremental writes"; | ||
updateMetaData(previousClusterState.metaData(), clusterState.metaData()); | ||
commit(currentTerm, clusterState.version()); | ||
try { | ||
updateMetaData(previousClusterState.metaData(), clusterState.metaData()); | ||
commit(currentTerm, clusterState.version()); | ||
} finally { | ||
closeIfAnyIndexWriterHasTragedyOrIsClosed(); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -634,23 +672,48 @@ private void addMetaData(MetaData metaData) throws IOException { | |
} | ||
} | ||
|
||
public void commit(long currentTerm, long lastAcceptedVersion) { | ||
public void commit(long currentTerm, long lastAcceptedVersion) throws IOException { | ||
ensureOpen(); | ||
try { | ||
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { | ||
metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); | ||
metaDataIndexWriter.prepareCommit(nodeId, currentTerm, lastAcceptedVersion); | ||
} | ||
} catch (Exception e) { | ||
try { | ||
close(); | ||
} catch (Exception e2) { | ||
logger.warn("failed on closing cluster state writer", e2); | ||
e.addSuppressed(e2); | ||
} | ||
throw e; | ||
} finally { | ||
closeIfAnyIndexWriterHasTragedyOrIsClosed(); | ||
} | ||
try { | ||
for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { | ||
metaDataIndexWriter.commit(); | ||
} | ||
} catch (IOException e) { | ||
// The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the | ||
// data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and | ||
// retry from the beginning. | ||
try { | ||
close(); | ||
} catch (Exception e2) { | ||
e.addSuppressed(e2); | ||
} | ||
throw new IOError(e); | ||
} finally { | ||
closeIfAnyIndexWriterHasTragedyOrIsClosed(); | ||
DaveCTurner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
logger.trace("closing"); | ||
IOUtils.close(metaDataIndexWriters); | ||
logger.trace("closing PersistedClusterStateService.Writer"); | ||
if (closed.compareAndSet(false, true)) { | ||
IOUtils.close(metaDataIndexWriters); | ||
} | ||
} | ||
|
||
private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException { | ||
|
Uh oh!
There was an error while loading. Please reload this page.