Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchData changes for primaries and replicas #8865

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 108 additions & 3 deletions server/src/main/java/org/opensearch/gateway/GatewayAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,15 @@
import org.opensearch.common.util.set.Sets;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
+import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ typo at the beginning of line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack



import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand All @@ -78,11 +84,13 @@ public class GatewayAllocator implements ExistingShardsAllocator {

private final PrimaryShardAllocator primaryShardAllocator;
private final ReplicaShardAllocator replicaShardAllocator;
private final PrimaryShardBatchAllocator primaryBatchShardAllocator;
private final ReplicaShardBatchAllocator replicaBatchShardAllocator;
Comment on lines +87 to +88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us stay consistent in our naming. The 'Batch' and 'Shard' in class name and variable name are inverted.
I prefer ShardBatch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, will update this later once PRs/tasks for Allocators are merged/approved to avoid any back and forth


private final ConcurrentMap<
ShardId,
AsyncShardFetch<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
.newConcurrentMap();
.newConcurrentMap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Fix the syntax. Add back the tab spacing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

private final ConcurrentMap<ShardId, AsyncShardFetch<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata>> asyncFetchStore =
ConcurrentCollections.newConcurrentMap();
private Set<String> lastSeenEphemeralIds = Collections.emptySet();
Expand All @@ -96,6 +104,8 @@ public GatewayAllocator(
this.rerouteService = rerouteService;
this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction);
this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction);
this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator();
this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator();
}

@Override
Expand Down Expand Up @@ -303,6 +313,57 @@ protected AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.Nod
}
}

class InternalPrimaryBatchShardAllocator extends PrimaryShardBatchAllocator {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us fix the naming here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, same comment as above

@Override
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {
ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
}

String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is shard was started or failed in between, we may get null here. So should we iterate on all eligible shards to get the batchId ? relying on first one may be incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not possible single threaded system

if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shard " + shardRouting + " has no batch id. Shard must be batched before fetch operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

}


if (batchIdToStartedShardBatch.containsKey(batchId) == false) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we want the explicit boolean comparison == false in place of ! batchIdToStartedShardBatch.containsKey(batchId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya replied before in previous PRs, I see this convention in this files and other files

logger.debug("Batch {} has no started shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no started shard batch");
}

ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch);

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap());
}

Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();

for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the shardToIgnoreNodes map have empty (set) values ? Can we ignore adding the entry in such cases?

Ref:

public Set<String> getIgnoreNodes(ShardId shardId) {
if (ignoredShardToNodes == null) {
return emptySet();
}
Set<String> ignore = ignoredShardToNodes.get(shardId);
if (ignore == null) {
return emptySet();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we ignore it will be later created by AsyncShardFetch object for completeness sake.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you link the code where we are adding the entries with empty set.

Is there scope to optimize here - Avoid creating the empty sets that server no purpose ?

}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
allocation.nodes(),
shardToIgnoreNodes
);

if (shardBatchState.hasData()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenario will shardBatchState not have data ? Should we add a log statement for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While the fetching is still in progress/failure.

What you want to log?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a log statement for it?

It'll start creating too many logs, let's avoid that.

shardBatchState.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch>) shardBatchState;
}
}

class InternalReplicaShardAllocator extends ReplicaShardAllocator {

private final TransportNodesListShardStoreMetadata storeAction;
Expand Down Expand Up @@ -335,10 +396,54 @@ protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeS
}
return shardStores;
}
}

class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you implement hasInitiatedFetching function in this class or should I pick this whole Internal class in my PR?


@Override
protected boolean hasInitiatedFetching(ShardRouting shard) {
return asyncFetchStore.get(shard.shardId()) != null;
@SuppressWarnings("unchecked")
protected AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> fetchData(Set<ShardRouting> shardsEligibleForFetch,
Set<ShardRouting> inEligibleShards,
RoutingAllocation allocation) {

ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null;
shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting;
if (shardRouting == null) {
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
}

String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null);
if (batchId == null) {
logger.debug("Shard {} has no batch id", shardRouting);
throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching");
}

if (batchIdToStoreShardBatch.containsKey(batchId) == false) {
logger.debug("Batch {} has no store shard batch", batchId);
throw new IllegalStateException("Batch " + batchId + " has no shard store batch");
}

ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId);
// remove in eligible shards which allocator is not responsible for
inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch);

if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) {
logger.debug("Batch {} is empty", batchId);
return new AsyncBatchShardFetch<>.FetchResult<>(null, Collections.emptyMap());
}
Map<ShardId, Set<String>> shardToIgnoreNodes = new HashMap<>();
for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) {
shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId));
}
AsyncBatchShardFetch<? extends BaseNodeResponse> asyncFetcher = shardsBatch.getAsyncFetcher();
AsyncBatchShardFetch.FetchResult<? extends BaseNodeResponse> shardBatchState = asyncFetcher.fetchData(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this variable to shardBatchStore to represent that this contains shard store address of primary shard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Store suffix is used in conjuction with replicas in code base

allocation.nodes(),
shardToIgnoreNodes
);
if (shardBatchState.hasData()) {
shardBatchState.processAllocation(allocation);
}
return (AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch>) shardBatchState;
}
}
}