Skip to content

Run CheckIndex on metadata index before loading (#73239) #74173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -46,6 +47,7 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand All @@ -70,6 +72,8 @@
import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand Down Expand Up @@ -306,6 +310,14 @@ public static void overrideVersion(Version newVersion, Path... dataPaths) throws
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
*/
public OnDiskState loadBestOnDiskState() throws IOException {
return loadBestOnDiskState(true);
}

/**
* Loads the best available on-disk cluster state. Returns {@link OnDiskState#NO_ON_DISK_STATE} if no such state was found.
* @param checkClean whether to check the index for corruption before loading, only for tests
*/
OnDiskState loadBestOnDiskState(boolean checkClean) throws IOException {
String committedClusterUuid = null;
Path committedClusterUuidPath = null;
OnDiskState bestOnDiskState = OnDiskState.NO_ON_DISK_STATE;
Expand All @@ -317,39 +329,63 @@ public OnDiskState loadBestOnDiskState() throws IOException {
for (final Path dataPath : dataPaths) {
final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME);
if (Files.exists(indexPath)) {
try (Directory directory = createDirectory(indexPath);
DirectoryReader directoryReader = DirectoryReader.open(directory)) {
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);
try (Directory directory = createDirectory(indexPath)) {
if (checkClean) {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
final boolean isClean;
try (PrintStream printStream = new PrintStream(outputStream, true, StandardCharsets.UTF_8.name());
CheckIndex checkIndex = new CheckIndex(directory)) {
checkIndex.setInfoStream(printStream);
checkIndex.setChecksumsOnly(true);
isClean = checkIndex.checkIndex().clean;
}

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId +
"] in [" + dataPath + "] but expected [" + nodeId + "]");
if (isClean == false) {
if (logger.isErrorEnabled()) {
for (final String line : outputStream.bytes().utf8ToString().split("\\r?\\n")) {
logger.error("checkIndex: {}", line);
}
}
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] has been changed by an external force after it was last written by Elasticsearch and is now unreadable");
}
}
}

if (onDiskState.metadata.clusterUUIDCommitted()) {
if (committedClusterUuid == null) {
committedClusterUuid = onDiskState.metadata.clusterUUID();
committedClusterUuidPath = dataPath;
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
+ dataPath + "]");

try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader);

if (nodeId.equals(onDiskState.nodeId) == false) {
throw new IllegalStateException("the index containing the cluster metadata under the data path [" + dataPath +
"] belongs to a node with ID [" + onDiskState.nodeId + "] but this node's ID is [" + nodeId + "]");
}
}

if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
maxCurrentTermOnDiskState = onDiskState;
}
if (onDiskState.metadata.clusterUUIDCommitted()) {
if (committedClusterUuid == null) {
committedClusterUuid = onDiskState.metadata.clusterUUID();
committedClusterUuidPath = dataPath;
} else if (committedClusterUuid.equals(onDiskState.metadata.clusterUUID()) == false) {
throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid +
"] in [" + committedClusterUuidPath + "] and [" + onDiskState.metadata.clusterUUID() + "] in ["
+ dataPath + "]");
}
}

if (maxCurrentTermOnDiskState.empty() || maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm) {
maxCurrentTermOnDiskState = onDiskState;
}

long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
if (bestOnDiskState.empty()
|| acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
long acceptedTerm = onDiskState.metadata.coordinationMetadata().term();
long maxAcceptedTerm = bestOnDiskState.metadata.coordinationMetadata().term();
if (bestOnDiskState.empty()
|| acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
&& (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
bestOnDiskState = onDiskState;
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
&& onDiskState.currentTerm > bestOnDiskState.currentTerm))) {
bestOnDiskState = onDiskState;
}
}
} catch (IndexNotFoundException e) {
logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void testDataOnlyNodePersistence() throws Exception {
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
CoordinationMetadata persistedCoordinationMetadata =
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
Expand All @@ -387,12 +387,12 @@ public void testDataOnlyNodePersistence() throws Exception {
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();

assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState(false).metadata.coordinationMetadata();
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
equalTo(GatewayMetaState.AsyncPersistedState.staleStateConfiguration));
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());
assertTrue(persistedClusterStateService.loadBestOnDiskState(false).metadata.clusterUUIDCommitted());

// generate a series of updates and check if batching works
final String indexName = randomAlphaOfLength(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.ExtrasFS;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand Down Expand Up @@ -40,12 +41,15 @@
import org.elasticsearch.gateway.PersistedClusterStateService.Writer;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLogAppender;
import org.elasticsearch.test.junit.annotations.TestLogging;

import java.io.IOError;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -56,12 +60,16 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.lucene.index.IndexWriter.WRITE_LOCK_NAME;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

public class PersistedClusterStateServiceTests extends ESTestCase {

Expand All @@ -79,7 +87,7 @@ public void testPersistsAndReloadsTerm() throws IOException {
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(0L));
try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(newTerm, ClusterState.EMPTY_STATE);
assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
assertThat(persistedClusterStateService.loadBestOnDiskState(false).currentTerm, equalTo(newTerm));
}

assertThat(persistedClusterStateService.loadBestOnDiskState().currentTerm, equalTo(newTerm));
Expand Down Expand Up @@ -218,8 +226,12 @@ public void testFailsOnMismatchedNodeIds() throws IOException {
.toArray(Path[]::new), nodeIds[0], xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L
).loadBestOnDiskState()).getMessage();
assertThat(message,
allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1])));
assertThat(message, allOf(
containsString("the index containing the cluster metadata under the data path"),
containsString("belongs to a node with ID"),
containsString("but this node's ID is"),
containsString(nodeIds[0]),
containsString(nodeIds[1])));
assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2),
Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString())));
}
Expand Down Expand Up @@ -315,7 +327,7 @@ public void testFailsIfFreshestStateIsInStaleTerm() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) {
try (Writer writer = newPersistedClusterStateService(nodeEnvironment).createWriter()) {
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService(nodeEnvironment)
.loadBestOnDiskState();
.loadBestOnDiskState(false);
final ClusterState clusterState = clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
writeState(writer, onDiskState.currentTerm, ClusterState.builder(clusterState)
.metadata(Metadata.builder(clusterState.metadata()).version(2)
Expand Down Expand Up @@ -851,6 +863,30 @@ public void testSlowLogging() throws IOException, IllegalAccessException {
}
}

public void testFailsIfCorrupt() throws IOException {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService = newPersistedClusterStateService(nodeEnvironment);

try (Writer writer = persistedClusterStateService.createWriter()) {
writer.writeFullStateAndCommit(1, ClusterState.EMPTY_STATE);
}

try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(nodeEnvironment.nodeDataPaths()[0].resolve("_state"))) {
CorruptionUtils.corruptFile(random(), randomFrom(StreamSupport
.stream(directoryStream.spliterator(), false)
.filter(p -> {
final String filename = p.getFileName().toString();
return ExtrasFS.isExtra(filename) == false && filename.equals(WRITE_LOCK_NAME) == false;
})
.collect(Collectors.toList())));
}

assertThat(expectThrows(IllegalStateException.class, persistedClusterStateService::loadBestOnDiskState).getMessage(), allOf(
startsWith("the index containing the cluster metadata under the data path ["),
endsWith("] has been changed by an external force after it was last written by Elasticsearch and is now unreadable")));
}
}

private void assertExpectedLogs(long currentTerm, ClusterState previousState, ClusterState clusterState,
PersistedClusterStateService.Writer writer, MockLogAppender.LoggingExpectation expectation)
throws IllegalAccessException, IOException {
Expand Down Expand Up @@ -896,7 +932,7 @@ private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException
}

private static ClusterState loadPersistedClusterState(PersistedClusterStateService persistedClusterStateService) throws IOException {
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState();
final PersistedClusterStateService.OnDiskState onDiskState = persistedClusterStateService.loadBestOnDiskState(false);
return clusterStateFromMetadata(onDiskState.lastAcceptedVersion, onDiskState.metadata);
}

Expand Down