Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ protected String[] filterNodeIds(DiscoveryNodes nodes, String[] nodesIds) {
return nodesIds;
}

protected String[] resolveNodes(NodesRequest request, ClusterState clusterState) {
return clusterState.nodes().resolveNodesIds(request.nodesIds());
}


private class AsyncAction {

private final NodesRequest request;
Expand All @@ -96,7 +101,7 @@ private AsyncAction(NodesRequest request, ActionListener<NodesResponse> listener
this.request = request;
this.listener = listener;
clusterState = clusterService.state();
String[] nodesIds = clusterState.nodes().resolveNodesIds(request.nodesIds());
String[] nodesIds = resolveNodes(request, clusterState);
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<>(this.nodesIds.length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,29 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData meta
protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) {
if (closed) {
// we are closed, no need to process this async fetch at all
logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
return;
}
logger.trace("{} processing fetched [{}] results", shardId, type);

if (responses != null) {
for (T response : responses) {
NodeEntry<T> nodeEntry = cache.get(response.getNode().getId());
// if the entry is there, and not marked as failed already, process it
if (nodeEntry != null && nodeEntry.isFailed() == false) {
if (nodeEntry == null) {
continue;
}
if (nodeEntry.isFailed()) {
logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type, nodeEntry.getFailure());
} else {
logger.trace("{} marking {} as done for [{}]", shardId, nodeEntry.getNodeId(), type);
nodeEntry.doneFetching(response);
}
}
}
if (failures != null) {
for (FailedNodeException failure : failures) {
logger.trace("{} processing failure {} for [{}]", shardId, failure, type);
NodeEntry<T> nodeEntry = cache.get(failure.nodeId());
// if the entry is there, and not marked as failed already, process it
if (nodeEntry != null && nodeEntry.isFailed() == false) {
Expand Down Expand Up @@ -253,6 +263,7 @@ private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
// visible for testing
void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
IndexMetaData indexMetaData = metaData.index(shardId.getIndex());
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
action.list(shardId, indexMetaData, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -68,6 +69,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
execute(new Request(shardId, indexMetaData.getUUID(), nodesIds), listener);
}

@Override
protected String[] resolveNodes(Request request, ClusterState clusterState) {
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
// we were given for accounting on the caller
return request.nodesIds();
}

@Override
protected boolean transportCompress() {
return true; // this can become big...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -81,6 +82,13 @@ public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds
execute(new Request(shardId, false, nodesIds), listener);
}

@Override
protected String[] resolveNodes(Request request, ClusterState clusterState) {
// default implementation may filter out non existent nodes. it's important to keep exactly the ids
// we were given for accounting on the caller
return request.nodesIds();
}

@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
Expand All @@ -46,6 +47,7 @@
public class MinimumMasterNodesTests extends ElasticsearchIntegrationTest {

@Test
@TestLogging("cluster.service:TRACE,discovery.zen:TRACE,gateway:TRACE,transport.tracer:TRACE")
public void simpleMinimumMasterNodes() throws Exception {

Settings settings = settingsBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@
package org.elasticsearch.indices.state;

import com.google.common.collect.ImmutableMap;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoverySettings;
Expand All @@ -52,19 +55,12 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.*;

/**
*/
Expand Down Expand Up @@ -102,6 +98,72 @@ public void testUnassignedShardAndEmptyNodesInRoutingTable() throws Exception {
allocator.allocateUnassigned(routingAllocation);
}

@Test
@TestLogging("gateway:TRACE")
public void testAssignmentWithJustAddedNodes() throws Exception {
internalCluster().startNode();
final String index = "index";
prepareCreate(index).setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).get();
ensureGreen(index);

// close to have some unassigned started shards shards..
client().admin().indices().prepareClose(index).get();


final String masterName = internalCluster().getMasterName();
final ClusterService clusterService = internalCluster().clusterService(masterName);
final AllocationService allocationService = internalCluster().getInstance(AllocationService.class, masterName);
clusterService.submitStateUpdateTask("test-inject-node-and-reroute", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// inject a node
ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).put(new DiscoveryNode("_non_existent", DummyTransportAddress.INSTANCE, Version.CURRENT)));

// open index
final IndexMetaData indexMetaData = IndexMetaData.builder(currentState.metaData().index(index)).state(IndexMetaData.State.OPEN).build();

builder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
builder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).removeIndexBlocks(index));
ClusterState updatedState = builder.build();

RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
routingTable.addAsRecovery(updatedState.metaData().index(index));
updatedState = ClusterState.builder(updatedState).routingTable(routingTable).build();

RoutingAllocation.Result result = allocationService.reroute(updatedState);
return ClusterState.builder(updatedState).routingResult(result).build();

}

@Override
public void onFailure(String source, Throwable t) {

}
});
ensureGreen(index);
// remove the extra node
clusterService.submitStateUpdateTask("test-remove-injected-node", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// inject a node
ClusterState.Builder builder = ClusterState.builder(currentState);
builder.nodes(DiscoveryNodes.builder(currentState.nodes()).remove("_non_existent"));

currentState = builder.build();
RoutingAllocation.Result result = allocationService.reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();

}

@Override
public void onFailure(String source, Throwable t) {

}
});
}


@Test
@TestLogging(value = "cluster.service:TRACE")
public void testDeleteCreateInOneBulk() throws Exception {
Expand Down Expand Up @@ -190,6 +252,7 @@ public void testDelayedMappingPropagationOnPrimary() throws Exception {
public void onResponse(PutMappingResponse response) {
putMappingResponse.set(response);
}

@Override
public void onFailure(Throwable e) {
putMappingResponse.set(e);
Expand Down Expand Up @@ -221,6 +284,7 @@ public void run() {
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}

@Override
public void onFailure(Throwable e) {
docIndexResponse.set(e);
Expand Down Expand Up @@ -304,6 +368,7 @@ public void testDelayedMappingPropagationOnReplica() throws Exception {
public void onResponse(PutMappingResponse response) {
putMappingResponse.set(response);
}

@Override
public void onFailure(Throwable e) {
putMappingResponse.set(e);
Expand All @@ -329,6 +394,7 @@ public void run() {
public void onResponse(IndexResponse response) {
docIndexResponse.set(response);
}

@Override
public void onFailure(Throwable e) {
docIndexResponse.set(e);
Expand Down