Skip to content

Auto-allocate searchable snapshots #52527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a524179
Auto-allocate searchable snapshots
DaveCTurner Feb 19, 2020
44df860
Rename
DaveCTurner Feb 19, 2020
1015ac8
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Feb 19, 2020
438ec61
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Feb 19, 2020
41071b2
Select a unique allocator for each shard
DaveCTurner Feb 20, 2020
9d1da96
Didn't know I edited that
DaveCTurner Feb 20, 2020
ab39f0a
Revert sorting by primary/replica since we have to iterate twice anyway
DaveCTurner Feb 21, 2020
66ca063
Revert some unnecessary changes
DaveCTurner Feb 21, 2020
02e6158
Add tests for duplicate checks of existing shards allocators
DaveCTurner Feb 21, 2020
5154f7f
We always take a decision right now
DaveCTurner Feb 21, 2020
e61a342
Add test of allocation service behaviour
DaveCTurner Feb 21, 2020
aad3fc0
Add test for unknown allocator too
DaveCTurner Feb 21, 2020
09c49b8
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Feb 21, 2020
f1228d6
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Mar 2, 2020
7682438
Reformat
DaveCTurner Mar 2, 2020
c9b84ab
Consistent use of loops
DaveCTurner Mar 2, 2020
6cd0917
Improve comment
DaveCTurner Mar 2, 2020
e6e5b7e
Use null rather than empty for unset
DaveCTurner Mar 2, 2020
c7c1701
Extract interface
DaveCTurner Mar 2, 2020
a72d9ba
Consistent argument order
DaveCTurner Mar 2, 2020
3301a2b
More specific method name
DaveCTurner Mar 2, 2020
4b8888c
Capture plugins at init time
DaveCTurner Mar 2, 2020
ac303eb
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Mar 11, 2020
01c9b33
Include plugin class name in error
DaveCTurner Mar 11, 2020
1857794
Whitespace
DaveCTurner Mar 11, 2020
41fa9c6
Reorder params
DaveCTurner Mar 11, 2020
4c9d3ee
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Mar 11, 2020
05b690c
Merge branch 'feature/searchable-snapshots' into 2020-02-19-pluggable…
DaveCTurner Mar 11, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ private static class NoopGatewayAllocator extends GatewayAllocator {
public static final NoopGatewayAllocator INSTANCE = new NoopGatewayAllocator();

@Override
public void applyStartedShards(RoutingAllocation allocation, List<ShardRouting> startedShards) {
public void applyStartedShards(List<ShardRouting> startedShards, RoutingAllocation allocation) {
// noop
}

@Override
public void applyFailedShards(RoutingAllocation allocation, List<FailedShard> failedShards) {
public void applyFailedShards(List<FailedShard> failedShards, RoutingAllocation allocation) {
// noop
}

@Override
public void allocateUnassigned(RoutingAllocation allocation) {
public void allocateUnassigned(
ShardRouting shardRouting,
RoutingAllocation allocation,
UnassignedAllocationHandler unassignedAllocationHandler
) {
// noop
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.MoveDecision;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation.DebugMode;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
Expand All @@ -43,7 +42,6 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -63,20 +61,20 @@ public class TransportClusterAllocationExplainAction
private final ClusterInfoService clusterInfoService;
private final AllocationDeciders allocationDeciders;
private final ShardsAllocator shardAllocator;
private final GatewayAllocator gatewayAllocator;
private final AllocationService allocationService;

@Inject
public TransportClusterAllocationExplainAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterInfoService clusterInfoService, AllocationDeciders allocationDeciders,
ShardsAllocator shardAllocator, GatewayAllocator gatewayAllocator) {
ShardsAllocator shardAllocator, AllocationService allocationService) {
super(ClusterAllocationExplainAction.NAME, transportService, clusterService, threadPool, actionFilters,
ClusterAllocationExplainRequest::new, indexNameExpressionResolver);
this.clusterInfoService = clusterInfoService;
this.allocationDeciders = allocationDeciders;
this.shardAllocator = shardAllocator;
this.gatewayAllocator = gatewayAllocator;
this.allocationService = allocationService;
}

@Override
Expand Down Expand Up @@ -106,27 +104,21 @@ protected void masterOperation(Task task, final ClusterAllocationExplainRequest
logger.debug("explaining the allocation for [{}], found shard [{}]", request, shardRouting);

ClusterAllocationExplanation cae = explainShard(shardRouting, allocation,
request.includeDiskInfo() ? clusterInfo : null, request.includeYesDecisions(), gatewayAllocator, shardAllocator);
request.includeDiskInfo() ? clusterInfo : null, request.includeYesDecisions(), allocationService);
listener.onResponse(new ClusterAllocationExplainResponse(cae));
}

// public for testing
public static ClusterAllocationExplanation explainShard(ShardRouting shardRouting, RoutingAllocation allocation,
ClusterInfo clusterInfo, boolean includeYesDecisions,
GatewayAllocator gatewayAllocator, ShardsAllocator shardAllocator) {
AllocationService allocationService) {
allocation.setDebugMode(includeYesDecisions ? DebugMode.ON : DebugMode.EXCLUDE_YES_DECISIONS);

ShardAllocationDecision shardDecision;
if (shardRouting.initializing() || shardRouting.relocating()) {
shardDecision = ShardAllocationDecision.NOT_TAKEN;
} else {
AllocateUnassignedDecision allocateDecision = shardRouting.unassigned() ?
gatewayAllocator.decideUnassignedShardAllocation(shardRouting, allocation) : AllocateUnassignedDecision.NOT_TAKEN;
if (allocateDecision.isDecisionTaken() == false) {
shardDecision = shardAllocator.decideShardAllocation(shardRouting, allocation);
} else {
shardDecision = new ShardAllocationDecision(allocateDecision, MoveDecision.NOT_TAKEN);
}
shardDecision = allocationService.explainShardAllocation(shardRouting, allocation);
}

return new ClusterAllocationExplanation(shardRouting,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -55,15 +55,15 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<

private static final Logger logger = LogManager.getLogger(TransportClusterHealthAction.class);

private final GatewayAllocator gatewayAllocator;
private final AllocationService allocationService;

@Inject
public TransportClusterHealthAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) {
IndexNameExpressionResolver indexNameExpressionResolver, AllocationService allocationService) {
super(ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
ClusterHealthRequest::new, indexNameExpressionResolver);
this.gatewayAllocator = gatewayAllocator;
this.allocationService = allocationService;
}

@Override
Expand Down Expand Up @@ -229,14 +229,14 @@ private static int getWaitCount(ClusterHealthRequest request) {

private boolean validateRequest(final ClusterHealthRequest request, ClusterState clusterState, final int waitCount) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
return prepareResponse(request, response, clusterState, indexNameExpressionResolver) == waitCount;
}

private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
final int waitFor, boolean timedOut) {
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
gatewayAllocator.getNumberOfInFlightFetch(), clusterService.getMasterService().getMaxTaskWaitTime());
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
boolean valid = (readyCounter == waitFor);
assert valid || timedOut;
Expand Down
21 changes: 21 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
Expand Down Expand Up @@ -97,12 +98,14 @@ public class ClusterModule extends AbstractModule {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final AllocationDeciders allocationDeciders;
private final AllocationService allocationService;
private final List<ClusterPlugin> clusterPlugins;
// pkg private for tests
final Collection<AllocationDecider> deciderList;
final ShardsAllocator shardsAllocator;

public ClusterModule(Settings settings, ClusterService clusterService, List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
Expand Down Expand Up @@ -251,4 +254,22 @@ protected void configure() {
bind(AllocationDeciders.class).toInstance(allocationDeciders);
bind(ShardsAllocator.class).toInstance(shardsAllocator);
}

public void setExistingShardsAllocators(GatewayAllocator gatewayAllocator) {
final Map<String, ExistingShardsAllocator> existingShardsAllocators = new HashMap<>();
existingShardsAllocators.put(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator);

for (ClusterPlugin clusterPlugin : clusterPlugins) {
for (Map.Entry<String, ExistingShardsAllocator> existingShardsAllocatorEntry
: clusterPlugin.getExistingShardsAllocators().entrySet()) {
final String allocatorName = existingShardsAllocatorEntry.getKey();
if (existingShardsAllocators.put(allocatorName, existingShardsAllocatorEntry.getValue()) != null) {
throw new IllegalArgumentException("ExistingShardsAllocator [" + allocatorName + "] from [" +
clusterPlugin.getClass().getName() + "] was already defined");
}
}
}
allocationService.setExistingShardsAllocators(existingShardsAllocators);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.UnassignedInfo.AllocationStatus;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -883,7 +884,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R
ignored.add(shard);
}

public class UnassignedIterator implements Iterator<ShardRouting> {
public class UnassignedIterator implements Iterator<ShardRouting>, ExistingShardsAllocator.UnassignedAllocationHandler {

private final ListIterator<ShardRouting> iterator;
private ShardRouting current;
Expand All @@ -907,6 +908,7 @@ public ShardRouting next() {
*
* @param existingAllocationId allocation id to use. If null, a fresh allocation id is generated.
*/
@Override
public ShardRouting initialize(String nodeId, @Nullable String existingAllocationId, long expectedShardSize,
RoutingChangesObserver routingChangesObserver) {
nodes.ensureMutable();
Expand All @@ -922,6 +924,7 @@ public ShardRouting initialize(String nodeId, @Nullable String existingAllocatio
*
* @param attempt the result of the allocation attempt
*/
@Override
public void removeAndIgnore(AllocationStatus attempt, RoutingChangesObserver changes) {
nodes.ensureMutable();
innerRemove();
Expand All @@ -940,6 +943,7 @@ private void updateShardRouting(ShardRouting shardRouting) {
* @param recoverySource the new recovery source to use
* @return the shard with unassigned info updated
*/
@Override
public ShardRouting updateUnassigned(UnassignedInfo unassignedInfo, RecoverySource recoverySource,
RoutingChangesObserver changes) {
nodes.ensureMutable();
Expand Down
Loading