-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FetchData changes for primaries and replicas #8865
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -55,9 +55,15 @@ | |||||||||||||||||
import org.opensearch.common.util.set.Sets; | ||||||||||||||||||
import org.opensearch.index.shard.ShardId; | ||||||||||||||||||
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; | ||||||||||||||||||
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; | ||||||||||||||||||
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; | ||||||||||||||||||
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
import java.util.Collections; | ||||||||||||||||||
import java.util.HashMap; | ||||||||||||||||||
import java.util.List; | ||||||||||||||||||
import java.util.Map; | ||||||||||||||||||
import java.util.Set; | ||||||||||||||||||
import java.util.concurrent.ConcurrentMap; | ||||||||||||||||||
import java.util.stream.Collectors; | ||||||||||||||||||
|
@@ -78,11 +84,13 @@ public class GatewayAllocator implements ExistingShardsAllocator { | |||||||||||||||||
|
||||||||||||||||||
private final PrimaryShardAllocator primaryShardAllocator; | ||||||||||||||||||
private final ReplicaShardAllocator replicaShardAllocator; | ||||||||||||||||||
private final PrimaryShardBatchAllocator primaryBatchShardAllocator; | ||||||||||||||||||
private final ReplicaShardBatchAllocator replicaBatchShardAllocator; | ||||||||||||||||||
Comment on lines
+87
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us stay consistent in our naming. The 'Batch' and 'Shard' in class name and variable name are inverted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack, will update this later once PRs/tasks for Allocators are merged/approved to avoid any back and forth |
||||||||||||||||||
|
||||||||||||||||||
private final ConcurrentMap< | ||||||||||||||||||
ShardId, | ||||||||||||||||||
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections | ||||||||||||||||||
.newConcurrentMap(); | ||||||||||||||||||
.newConcurrentMap(); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Fix the syntax. Add back the tab spacing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack |
||||||||||||||||||
private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata>> asyncFetchStore = | ||||||||||||||||||
ConcurrentCollections.newConcurrentMap(); | ||||||||||||||||||
private Set<String> lastSeenEphemeralIds = Collections.emptySet(); | ||||||||||||||||||
|
@@ -96,6 +104,8 @@ public GatewayAllocator( | |||||||||||||||||
this.rerouteService = rerouteService; | ||||||||||||||||||
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); | ||||||||||||||||||
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); | ||||||||||||||||||
this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator(); | ||||||||||||||||||
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator(); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
@Override | ||||||||||||||||||
|
@@ -303,6 +313,57 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod | |||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us fix the naming here as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack, same comment as above |
||||||||||||||||||
@Override | ||||||||||||||||||
@SuppressWarnings("unchecked") | ||||||||||||||||||
protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch, | ||||||||||||||||||
Set<ShardRouting> inEligibleShards, | ||||||||||||||||||
RoutingAllocation allocation) { | ||||||||||||||||||
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; | ||||||||||||||||||
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; | ||||||||||||||||||
if (shardRouting == null) { | ||||||||||||||||||
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is shard was started or failed in between, we may get null here. So should we iterate on all eligible shards to get the batchId ? relying on first one may be incorrect. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not possible single threaded system |
||||||||||||||||||
if (batchId == null) { | ||||||||||||||||||
logger.debug("Shard {} has no batch id", shardRouting); | ||||||||||||||||||
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ack |
||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
||||||||||||||||||
if (batchIdToStartedShardBatch.containsKey(batchId) == false) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason we want the explicit boolean comparison There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya replied before in previous PRs, I see this convention in this files and other files |
||||||||||||||||||
logger.debug("Batch {} has no started shard batch", batchId); | ||||||||||||||||||
throw new IllegalStateException("Batch " + batchId + " has no started shard batch"); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId); | ||||||||||||||||||
// remove in eligible shards which allocator is not responsible for | ||||||||||||||||||
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); | ||||||||||||||||||
|
||||||||||||||||||
if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { | ||||||||||||||||||
logger.debug("Batch {} is empty", batchId); | ||||||||||||||||||
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>(); | ||||||||||||||||||
|
||||||||||||||||||
for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { | ||||||||||||||||||
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the Ref: OpenSearch/server/src/main/java/org/opensearch/cluster/routing/allocation/RoutingAllocation.java Lines 241 to 248 in f7f3500
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if we ignore it will be later created by AsyncShardFetch object for completeness sake. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you link the code where we are adding the entries with empty set. Is there scope to optimize here - Avoid creating the empty sets that server no purpose ? |
||||||||||||||||||
} | ||||||||||||||||||
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher(); | ||||||||||||||||||
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData( | ||||||||||||||||||
allocation.nodes(), | ||||||||||||||||||
shardToIgnoreNodes | ||||||||||||||||||
); | ||||||||||||||||||
|
||||||||||||||||||
if (shardBatchState.hasData()) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In what scenario will There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While the fetching is still in progress/failure. What you want to log? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It'll start creating too many logs, let's avoid that. |
||||||||||||||||||
shardBatchState.processAllocation(allocation); | ||||||||||||||||||
} | ||||||||||||||||||
return (AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState; | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
class InternalReplicaShardAllocator extends ReplicaShardAllocator { | ||||||||||||||||||
|
||||||||||||||||||
private final TransportNodesListShardStoreMetadata storeAction; | ||||||||||||||||||
|
@@ -335,10 +396,54 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS | |||||||||||||||||
} | ||||||||||||||||||
return shardStores; | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you implement |
||||||||||||||||||
|
||||||||||||||||||
@Override | ||||||||||||||||||
protected boolean hasInitiatedFetching(ShardRouting shard) { | ||||||||||||||||||
return asyncFetchStore.get(shard.shardId()) != null; | ||||||||||||||||||
@SuppressWarnings("unchecked") | ||||||||||||||||||
protected AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch, | ||||||||||||||||||
Set<ShardRouting> inEligibleShards, | ||||||||||||||||||
RoutingAllocation allocation) { | ||||||||||||||||||
|
||||||||||||||||||
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; | ||||||||||||||||||
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; | ||||||||||||||||||
if (shardRouting == null) { | ||||||||||||||||||
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap()); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null); | ||||||||||||||||||
if (batchId == null) { | ||||||||||||||||||
logger.debug("Shard {} has no batch id", shardRouting); | ||||||||||||||||||
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
if (batchIdToStoreShardBatch.containsKey(batchId) == false) { | ||||||||||||||||||
logger.debug("Batch {} has no store shard batch", batchId); | ||||||||||||||||||
throw new IllegalStateException("Batch " + batchId + " has no shard store batch"); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId); | ||||||||||||||||||
// remove in eligible shards which allocator is not responsible for | ||||||||||||||||||
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); | ||||||||||||||||||
|
||||||||||||||||||
if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { | ||||||||||||||||||
logger.debug("Batch {} is empty", batchId); | ||||||||||||||||||
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap()); | ||||||||||||||||||
} | ||||||||||||||||||
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>(); | ||||||||||||||||||
for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { | ||||||||||||||||||
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); | ||||||||||||||||||
} | ||||||||||||||||||
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher(); | ||||||||||||||||||
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData( | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you rename this variable to shardBatchStore to represent that this contains shard store address of primary shard? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Store suffix is used in conjuction with replicas in code base |
||||||||||||||||||
allocation.nodes(), | ||||||||||||||||||
shardToIgnoreNodes | ||||||||||||||||||
); | ||||||||||||||||||
if (shardBatchState.hasData()) { | ||||||||||||||||||
shardBatchState.processAllocation(allocation); | ||||||||||||||||||
} | ||||||||||||||||||
return (AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch>) shardBatchState; | ||||||||||||||||||
} | ||||||||||||||||||
} | ||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+
typo at the beginning of line.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack