Description
We currently have a logic that triggers a sync flush when a primary shard becomes inactive (after 5 minutes of no write activity on the primary shard). The goal of this is to ensure that sync flush markers are in place after a period of inactivity, so that a full cluster / rolling restart of nodes results in quick peer recoveries when there is no write activity on the respective shard. With operation-based recoveries, we also provide fast recoveries when there is write activity during node restarts. Operation-based recovery can, however, more frequently trigger situations where a replica shard becomes inactive, yet not all its searchable segments are flushed to disk, as the flushing is only triggered when a primary becomes inactive, and is not triggered by subsequent recoveries of replicas. This results in unnecessary extra storage (more translog generations + more Lucene segments) and possibly slows down future store- and peer-based recoveries. /cc: @jpountz
The following test illustrates the issue:
package org.elasticsearch.indices.flush;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class FlushOnInactivityIT extends ESIntegTestCase {
public void testFlushOnInactivity() throws Exception {
List<String> nodes = internalCluster().startNodes(2,
Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build());
client().admin().indices().prepareCreate("test").get();
ensureGreen("test");
index("test", "_doc", "1");
refresh("test"); // create segment
index("test", "_doc", "2");
refresh("test"); // create segment
internalCluster().restartNode(nodes.get(0), new InternalTestCluster.RestartCallback() {
public Settings onNodeStopped(String nodeName) throws Exception {
assertBusySegmentsFlushed(client(nodes.get(1)), "test");
return super.onNodeStopped(nodeName);
}
});
ensureGreen("test");
assertBusySegmentsFlushed(client(), "test");
}
private void assertBusySegmentsFlushed(Client client, String index) throws Exception {
assertBusy(() -> {
for (IndexShardSegments indexShardSegments : client.admin().indices().prepareSegments(index).get().getIndices().get(index)
.getShards().values()) {
for (ShardSegments shardSegments : indexShardSegments) {
assertThat(shardSegments.getNumberOfCommitted(), equalTo(shardSegments.getNumberOfSearch()));
}
}
});
}
}