Skip to content

Commit

Permalink
Remove settings from AllocationService and directly inject in Gateway…
Browse files Browse the repository at this point in the history
…Allocator
  • Loading branch information
dreamer-89 committed Aug 9, 2022
1 parent 907df84 commit a62d02c
Show file tree
Hide file tree
Showing 23 changed files with 58 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ public static AllocationService createAllocationService(Settings settings, Clust
NoopGatewayAllocator.INSTANCE,
new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,7 @@ public ClusterModule(
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
settings
);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
Expand Down Expand Up @@ -93,33 +92,28 @@ public class AllocationService {
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;

private final Settings settings;

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
AllocationDeciders allocationDeciders,
GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, settings);
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
Settings settings
SnapshotsInfoService snapshotsInfoService
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.settings = settings;
}

/**
Expand Down Expand Up @@ -225,7 +219,6 @@ public ClusterState applyFailedShards(
return clusterState;
}
ClusterState tmpState = IndexMetadataUpdater.removeStaleIdsWithoutRoutings(clusterState, staleShards, logger);
// clusterState.getMetadata().index(failedShards.get(0).getMessage())

RoutingNodes routingNodes = getMutableRoutingNodes(tmpState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ private static boolean isResponsibleFor(final ShardRouting shard) {
|| shard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT);
}

Settings settings;

@Override
public AllocateUnassignedDecision makeAllocationDecision(
final ShardRouting unassignedShard,
Expand Down Expand Up @@ -129,9 +127,8 @@ public AllocateUnassignedDecision makeAllocationDecision(
// don't create a new IndexSetting object for every shard as this could cause a lot of garbage
// on cluster restart if we allocate a boat load of shards
final IndexMetadata indexMetadata = allocation.metadata().getIndexSafe(unassignedShard.index());
final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings);
final IndexSettings indexSettings = settings != null ? new IndexSettings(indexMetadata, settings) : null;
final Set<String> inSyncAllocationIds = indexMetadata.inSyncAllocationIds(unassignedShard.id());
// final IndexSettings indexSettings = indexMetadata.getIndex().
final boolean snapshotRestore = unassignedShard.recoverySource().getType() == RecoverySource.Type.SNAPSHOT;

assert inSyncAllocationIds.isEmpty() == false;
Expand Down Expand Up @@ -407,7 +404,7 @@ protected static NodeShardsResult buildNodeShardsResult(
}

// If index has segrep enabled, then use replication checkpoint info to order the replicas
if (indexSettings.isSegRepEnabled()) {
if (indexSettings != null && indexSettings.isSegRepEnabled()) {
comparator = comparator.thenComparing(HIGHEST_REPLICATION_FIRST_CHECKPOINT_COMPARATOR);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,9 @@ public String getCustomDataPath() {
* @opensearch.internal
*/
public static class NodeGatewayStartedShards extends BaseNodeResponse {

private final String allocationId;
private final boolean primary;

private final Exception storeException;

private final ReplicationCheckpoint replicationCheckpoint;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
Expand Down Expand Up @@ -435,11 +432,13 @@ public void writeTo(StreamOutput out) throws IOException {
} else {
out.writeBoolean(false);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0) && replicationCheckpoint != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
} else {
out.writeBoolean(false);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (replicationCheckpoint != null) {
out.writeBoolean(true);
replicationCheckpoint.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
throw new UnsupportedOperationException("cannot explain");
}
}
}, null, null, null)
}, null, null)
);

assertEquals(shard.currentNodeId(), cae.getCurrentNode().getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public void testClusterStateUpdateTask() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
ClusterState clusterState = createInitialClusterState(allocationService);
ClusterRerouteRequest req = new ClusterRerouteRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ public void testErrorCondition() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand All @@ -182,8 +181,7 @@ public void testPassNumRoutingShards() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down Expand Up @@ -214,8 +212,7 @@ public void testPassNumRoutingShardsAndFail() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down Expand Up @@ -252,8 +249,7 @@ public void testShrinkIndexSettings() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void testClusterHealthWaitsForClusterStateApplication() throws Interrupte
threadPool,
new ActionFilters(new HashSet<>()),
indexNameExpressionResolver,
new AllocationService(null, new TestGatewayAllocator(), null, null, null, null)
new AllocationService(null, new TestGatewayAllocator(), null, null, null)
);
PlainActionFuture<ClusterHealthResponse> listener = new PlainActionFuture<>();
action.execute(new ClusterHealthRequest().waitForGreenStatus(), listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ public void testValidateShrinkIndex() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down Expand Up @@ -407,8 +406,7 @@ public void testValidateSplitIndex() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

RoutingTable routingTable = service.reroute(clusterState, "reroute").routingTable();
Expand Down Expand Up @@ -548,8 +546,7 @@ private void runPrepareResizeIndexSettingsTest(
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

final RoutingTable initialRoutingTable = service.reroute(initialClusterState, "reroute").routingTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
},
new EmptyClusterInfoService(),
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);

final String unrealisticAllocatorName = "unrealistic";
Expand Down Expand Up @@ -263,7 +262,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}

public void testExplainsNonAllocationOfShardWithUnknownAllocator() {
final AllocationService allocationService = new AllocationService(null, null, null, null, null);
final AllocationService allocationService = new AllocationService(null, null, null, null);
allocationService.setExistingShardsAllocators(
Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
},
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
Metadata.Builder metadataBuilder = Metadata.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ private static AllocationService newAllocationService(Settings settings, Set<All
new TestGatewayAllocator(),
new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ public void setUp() throws Exception {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,7 @@ public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNode
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState();
// the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
Expand Down Expand Up @@ -518,8 +517,7 @@ public void testRestoreDoesNotAllocateSnapshotOnOlderNodes() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
() -> new SnapshotShardSizeInfo(snapshotShardSizes.build()),
Settings.EMPTY
() -> new SnapshotShardSizeInfo(snapshotShardSizes.build())
);
state = strategy.reroute(state, new AllocationCommands(), true, false).getClusterState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void testRandomDecisions() {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
int indices = scaledRandomIntBetween(1, 20);
Builder metaBuilder = Metadata.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public void setUp() throws Exception {
new TestGatewayAllocator(),
new BalancedShardsAllocator(Settings.EMPTY),
EmptyClusterInfoService.INSTANCE,
EmptySnapshotsInfoService.INSTANCE,
Settings.EMPTY
EmptySnapshotsInfoService.INSTANCE
);
}

Expand Down
Loading

0 comments on commit a62d02c

Please sign in to comment.