Skip to content

Commit

Permalink
Fix recovery translog stats totals when recovering from store
Browse files Browse the repository at this point in the history
Recovery from store fails to correctly set the translog recovery stats. This fixes it and tightens up the logic bringing it all to IndexShard (previously it was set by the recovery logic).

Closes elastic#15974
Closes elastic#16493
  • Loading branch information
bleskes committed Feb 8, 2016
1 parent 7bfb1df commit 4decc72
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ void cancelRelocation() {
}

/**
* Moves the shard from started to initializing and bumps the version
* Moves the shard from started to initializing
*/
void reinitializeShard() {
ensureNotFrozen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.common.Nullable;
Expand All @@ -68,7 +67,6 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -233,20 +231,7 @@ protected void recoverFromTranslog(EngineConfig engineConfig, Translog.TranslogG
final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
try {
Translog.Snapshot snapshot = translog.newSnapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
try {
handler.performRecoveryOperation(this, operation, true);
opsRecovered++;
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {
throw e;
}
}
}
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
Expand Down
21 changes: 18 additions & 3 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
Expand Down Expand Up @@ -89,13 +90,12 @@
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.SearchSlowLog;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.suggest.stats.ShardSuggestMetric;
Expand All @@ -105,8 +105,8 @@
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.percolator.PercolatorService;
Expand Down Expand Up @@ -874,6 +874,12 @@ public int performBatchRecovery(Iterable<Translog.Operation> operations) {
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public void performTranslogRecovery(boolean indexExists) {
if (indexExists == false) {
// note: these are set when recovering from the translog
final RecoveryState.Translog translogStats = recoveryState().getTranslog();
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
}
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}
Expand Down Expand Up @@ -1387,6 +1393,15 @@ protected void operationProcessed() {
assert recoveryState != null;
recoveryState.getTranslog().incrementRecoveredOperations();
}

@Override
public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
assert recoveryState != null;
RecoveryState.Translog translogStats = recoveryState.getTranslog();
translogStats.totalOperations(snapshot.totalOperations());
translogStats.totalOperationsOnStart(snapshot.totalOperations());
return super.recoveryFromSnapshot(engine, snapshot);
}
};
return new EngineConfig(shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ private void internalRecoverFromStore(IndexShard indexShard, boolean indexShould
logger.trace("cleaning existing shard, shouldn't exists");
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(Lucene.STANDARD_ANALYZER).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
writer.close();
recoveryState.getTranslog().totalOperations(0);
}
}
} catch (Throwable e) {
Expand All @@ -224,10 +223,6 @@ private void internalRecoverFromStore(IndexShard indexShard, boolean indexShould
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
if (indexShouldExists == false) {
recoveryState.getTranslog().totalOperations(0);
recoveryState.getTranslog().totalOperationsOnStart(0);
}
indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -77,6 +78,25 @@ int performBatchRecovery(Engine engine, Iterable<Translog.Operation> operations)
return numOps;
}

public int recoveryFromSnapshot(Engine engine, Translog.Snapshot snapshot) throws IOException {
Translog.Operation operation;
int opsRecovered = 0;
while ((operation = snapshot.next()) != null) {
try {
performRecoveryOperation(engine, operation, true);
opsRecovered++;
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.BAD_REQUEST) {
// mainly for MapperParsingException and Failure to detect xcontent
logger.info("ignoring recovery of a corrupt translog entry", e);
} else {
throw e;
}
}
}
return opsRecovered;
}

public static class BatchOperationException extends ElasticsearchException {

private final int completedOperations;
Expand Down Expand Up @@ -182,6 +202,7 @@ protected void operationProcessed() {
// noop
}


/**
* Returns the recovered types modifying the mapping during the recovery
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public static void reinit(ShardRouting routing) {
routing.reinitializeShard();
}

public static void reinit(ShardRouting routing, UnassignedInfo.Reason reason) {
routing.reinitializeShard();
routing.updateUnassignedInfo(new UnassignedInfo(reason, "test_reinit"));
}

public static void moveToUnassigned(ShardRouting routing, UnassignedInfo info) {
routing.moveToUnassigned(info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.NodeServicesProvider;
Expand Down Expand Up @@ -865,10 +864,11 @@ public void testRecoverFromStore() throws IOException {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);

int translogOps = 1;
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
translogOps = 0;
}
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
Expand All @@ -878,13 +878,47 @@ public void testRecoverFromStore() throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 1);
}

public void testRecoverFromCleanStore() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService test = indicesService.indexService("test");
final IndexShard shard = test.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(randomBoolean()).get();
if (randomBoolean()) {
client().admin().indices().prepareFlush().get();
}
ShardRouting routing = new ShardRouting(shard.routingEntry());
test.removeShard(0, "b/c simon says so");
ShardRoutingHelper.reinit(routing, UnassignedInfo.Reason.INDEX_CREATED);
IndexShard newShard = test.createShard(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
localNode));
assertTrue(newShard.recoverFromStore(localNode));
assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
assertEquals(0, newShard.recoveryState().getTranslog().totalOperationsOnStart());
assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f);
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
SearchResponse response = client().prepareSearch().get();
assertHitCount(response, 0);
}

public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
createIndex("test");
ensureGreen();
Expand Down Expand Up @@ -1187,7 +1221,8 @@ public void testTranslogRecoverySyncsTranslog() throws IOException {
List<Translog.Operation> operations = new ArrayList<>();
operations.add(new Translog.Index("testtype", "1", jsonBuilder().startObject().field("foo", "bar").endObject().bytes().toBytes()));
newShard.prepareForIndexRecovery();
newShard.performTranslogRecovery(true);
newShard.recoveryState().getTranslog().totalOperations(operations.size());
newShard.skipTranslogRecovery();
newShard.performBatchRecovery(operations);
assertFalse(newShard.getTranslog().syncNeeded());
}
Expand Down

0 comments on commit 4decc72

Please sign in to comment.