Skip to content

Flush inactive shards #31965

Closed
Closed
@ywelsch

Description

@ywelsch

We currently have a logic that triggers a sync flush when a primary shard becomes inactive (after 5 minutes of no write activity on the primary shard). The goal of this is to ensure that sync flush markers are in place after a period of inactivity, so that a full cluster / rolling restart of nodes results in quick peer recoveries when there is no write activity on the respective shard. With operation-based recoveries, we also provide fast recoveries when there is write activity during node restarts. Operation-based recovery can, however, more frequently trigger situations where a replica shard becomes inactive, yet not all its searchable segments are flushed to disk, as the flushing is only triggered when a primary becomes inactive, and is not triggered by subsequent recoveries of replicas. This results in unnecessary extra storage (more translog generations + more Lucene segments) and possibly slows down future store- and peer-based recoveries. /cc: @jpountz

The following test illustrates the issue:

package org.elasticsearch.indices.flush;

import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;

import java.util.List;

import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class FlushOnInactivityIT extends ESIntegTestCase {

    public void testFlushOnInactivity() throws Exception {
        List<String> nodes = internalCluster().startNodes(2,
            Settings.builder().put(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.getKey(), "3s").build());

        client().admin().indices().prepareCreate("test").get();

        ensureGreen("test");

        index("test", "_doc", "1");
        refresh("test"); // create segment
        index("test", "_doc", "2");
        refresh("test"); // create segment

        internalCluster().restartNode(nodes.get(0), new InternalTestCluster.RestartCallback() {

            public Settings onNodeStopped(String nodeName) throws Exception {
                assertBusySegmentsFlushed(client(nodes.get(1)), "test");
                return super.onNodeStopped(nodeName);
            }

        });

        ensureGreen("test");

        assertBusySegmentsFlushed(client(), "test");
    }

    private void assertBusySegmentsFlushed(Client client, String index) throws Exception {
        assertBusy(() -> {
            for (IndexShardSegments indexShardSegments : client.admin().indices().prepareSegments(index).get().getIndices().get(index)
                .getShards().values()) {
                for (ShardSegments shardSegments : indexShardSegments) {
                    assertThat(shardSegments.getNumberOfCommitted(), equalTo(shardSegments.getNumberOfSearch()));
                }
            }
        });
    }

}

Metadata

Metadata

Assignees

Labels

:Distributed Indexing/EngineAnything around managing Lucene and the Translog in an open shard.:Distributed Indexing/RecoveryAnything around constructing a new shard, either from a local or a remote source.>bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions