Skip to content

Delete index folder if all shards were allocated away from a data only node #9985

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,14 @@ public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOExc
if (nodeEnv.hasNodeFile()) {
synchronized (this) {
String indexName = metaData.index();
if (indices.containsKey(metaData.index())) {
String localUUid = indices.get(metaData.index()).v1().indexUUID();
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + metaData.getIndex() + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
if (indices.containsKey(indexName)) {
String localUUid = indices.get(indexName).v1().indexUUID();
throw new ElasticsearchIllegalStateException("Can't delete index store for [" + indexName + "] - it's still part of the indices service [" + localUUid+ "] [" + metaData.getUUID() + "]");
}
ClusterState clusterState = clusterService.state();
if (clusterState.metaData().hasIndex(indexName)) {
if (clusterState.metaData().hasIndex(indexName) && (clusterState.nodes().localNode().masterNode() == true)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case isn't the exception wrong? I mean shouldn't we just skip it in that case?

// we do not delete the store if it is a master eligible node and the index is still in the cluster state
// because we want to keep the meta data for indices around even if no shards are left here
final IndexMetaData index = clusterState.metaData().index(indexName);
throw new ElasticsearchIllegalStateException("Can't delete closed index store for [" + indexName + "] - it's still part of the cluster state [" + index.getUUID() + "] [" + metaData.getUUID() + "]");
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/elasticsearch/indices/store/IndicesStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,18 @@ public ClusterState execute(ClusterState currentState) throws Exception {
IndexMetaData indexMeta = clusterState.getMetaData().indices().get(shardId.getIndex());
try {
indicesService.deleteShardStore("no longer used", shardId, indexMeta);
} catch (Exception ex) {
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated shard, ignoring", ex, shardId);
}
// if the index doesn't exists anymore, delete its store as well, but only if its a non master node, since master
// nodes keep the index metadata around
if (indicesService.hasIndex(shardId.getIndex()) == false && currentState.nodes().localNode().masterNode() == false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why would master nodes want to keep the index directories around?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the index metadata

try {
indicesService.deleteIndexStore("no longer used", indexMeta);
} catch (Throwable ex) {
logger.debug("{} failed to delete unallocated index, ignoring", ex, shardId.getIndex());
}
}
return currentState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
Expand All @@ -41,6 +43,8 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
Expand All @@ -53,6 +57,57 @@
@ClusterScope(scope= Scope.TEST, numDataNodes = 0)
public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {

@Test
public void indexCleanup() throws Exception {
final String masterNode = internalCluster().startNode(ImmutableSettings.builder().put("node.data", false));
final String node_1 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
final String node_2 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
logger.info("--> creating index [test] with one shard and on replica");
assertAcked(prepareCreate("test").setSettings(
ImmutableSettings.builder().put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen("test");

logger.info("--> making sure that shard and its replica are allocated on node_1 and node_2");
assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));

logger.info("--> starting node server3");
final String node_3 = internalCluster().startNode(ImmutableSettings.builder().put("node.master", false));
logger.info("--> running cluster_health");
ClusterHealthResponse clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));

assertThat(Files.exists(shardDirectory(node_1, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_1, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(false));
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(false));

logger.info("--> move shard from node_1 to node_3, and wait for relocation to finish");
internalCluster().client().admin().cluster().prepareReroute().add(new MoveAllocationCommand(new ShardId("test", 0), node_1, node_3)).get();
clusterHealth = client().admin().cluster().prepareHealth()
.setWaitForNodes("4")
.setWaitForRelocatingShards(0)
.get();
assertThat(clusterHealth.isTimedOut(), equalTo(false));

assertThat(waitForShardDeletion(node_1, "test", 0), equalTo(false));
assertThat(waitForIndexDeletion(node_1, "test"), equalTo(false));
assertThat(Files.exists(shardDirectory(node_2, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_2, "test")), equalTo(true));
assertThat(Files.exists(shardDirectory(node_3, "test", 0)), equalTo(true));
assertThat(Files.exists(indexDirectory(node_3, "test")), equalTo(true));
}

@Test
public void shardsCleanup() throws Exception {
final String node_1 = internalCluster().startNode();
Expand Down Expand Up @@ -115,34 +170,51 @@ public void shardsCleanup() throws Exception {

@Test
public void testShardActiveElseWhere() throws Exception {
String node_1 = internalCluster().startNode();
String node_2 = internalCluster().startNode();
boolean node1IsMasterEligible = randomBoolean();
boolean node2IsMasterEligible = !node1IsMasterEligible || randomBoolean();
Future<String> node_1_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node1IsMasterEligible).build());
Future<String> node_2_future = internalCluster().startNodeAsync(ImmutableSettings.builder().put("node.master", node2IsMasterEligible).build());
final String node_1 = node_1_future.get();
final String node_2 = node_2_future.get();
final String node_1_id = internalCluster().getInstance(DiscoveryService.class, node_1).localNode().getId();
final String node_2_id = internalCluster().getInstance(DiscoveryService.class, node_2).localNode().getId();

logger.debug("node {} (node_1) is {}master eligible", node_1, node1IsMasterEligible ? "" : "not ");
logger.debug("node {} (node_2) is {}master eligible", node_2, node2IsMasterEligible ? "" : "not ");
logger.debug("node {} became master", internalCluster().getMasterName());
final int numShards = scaledRandomIntBetween(2, 20);
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards))
);
ensureGreen("test");

waitNoPendingTasksOnAll();
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();

RoutingNode routingNode = stateResponse.getState().routingNodes().node(node_2_id);
int[] node2Shards = new int[routingNode.numberOfOwningShards()];
final int[] node2Shards = new int[routingNode.numberOfOwningShards()];
int i = 0;
for (MutableShardRouting mutableShardRouting : routingNode) {
node2Shards[i++] = mutableShardRouting.shardId().id();
node2Shards[i] = mutableShardRouting.shardId().id();
i++;
}
logger.info("Node 2 has shards: {}", Arrays.toString(node2Shards));
waitNoPendingTasksOnAll();
final long shardVersions[] = new long[numShards];
final int shardIds[] = new int[numShards];
i=0;
for (ShardRouting shardRouting : stateResponse.getState().getRoutingTable().allShards("test")) {
shardVersions[i] = shardRouting.version();
shardIds[i] = shardRouting.getId();
i++;
}
internalCluster().getInstance(ClusterService.class, node_2).submitStateUpdateTask("test", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");
for (int i = 0; i < numShards; i++) {
indexRoutingTableBuilder.addIndexShard(
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, 1))
.addShard(new ImmutableShardRouting("test", i, node_1_id, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]]))
.build()
);
}
Expand All @@ -166,6 +238,11 @@ public void onFailure(String source, Throwable t) {
}
}

private Path indexDirectory(String server, String index) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.indexPaths(new Index(index))[0];
}

private Path shardDirectory(String server, String index, int shard) {
NodeEnvironment env = internalCluster().getInstance(NodeEnvironment.class, server);
return env.shardPaths(new ShardId(index, shard))[0];
Expand All @@ -181,5 +258,13 @@ public boolean apply(Object o) {
return Files.exists(shardDirectory(server, index, shard));
}


private boolean waitForIndexDeletion(final String server, final String index) throws InterruptedException {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object o) {
return !Files.exists(indexDirectory(server, index));
}
});
return Files.exists(indexDirectory(server, index));
}
}