Skip to content

Commit

Permalink
Merge branch 'main' into float_object_doc_value
Browse files Browse the repository at this point in the history
Signed-off-by: kkewwei <kewei.11@bytedance.com>
  • Loading branch information
kkewwei authored Dec 16, 2024
2 parents a41d71e + b359dd8 commit 5714d79
Show file tree
Hide file tree
Showing 10 changed files with 1,157 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)).
- Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/))
- Added a precaution to handle extreme date values during sorting to prevent `arithmetic_exception: long overflow` ([#16812](https://github.com/opensearch-project/OpenSearch/pull/16812)).
- Add search replica stats to segment replication stats API ([#16678](https://github.com/opensearch-project/OpenSearch/pull/16678))
- Added ability to retrieve value from DocValues in a flat_object filed([#16802](https://github.com/opensearch-project/OpenSearch/pull/16802))

### Dependencies
Expand Down Expand Up @@ -67,6 +68,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bound the size of cache in deprecation logger ([16702](https://github.com/opensearch-project/OpenSearch/issues/16702))
- Ensure consistency of system flag on IndexMetadata after diff is applied ([#16644](https://github.com/opensearch-project/OpenSearch/pull/16644))
- Skip remote-repositories validations for node-joins when RepositoriesService is not in sync with cluster-state ([#16763](https://github.com/opensearch-project/OpenSearch/pull/16763))
- Fix _list/shards API failing when closed indices are present ([#16606](https://github.com/opensearch-project/OpenSearch/pull/16606))
- Fix remote shards balance ([#15335](https://github.com/opensearch-project/OpenSearch/pull/15335))

### Security

Expand Down
1 change: 1 addition & 0 deletions release-notes/opensearch.release-notes-1.3.20.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Bump `commonsio` to 2.16.0 ([#16780](https://github.com/opensearch-project/OpenSearch/pull/16780))
- Bump `protobuf-java` to 3.25.5 ([#16792](https://github.com/opensearch-project/OpenSearch/pull/16792))
- Bump `snappy-java` to 1.1.10.7 ([#16792](https://github.com/opensearch-project/OpenSearch/pull/16792))
- Bump `mime4j-core` to 0.8.11 ([#16810](https://github.com/opensearch-project/OpenSearch/pull/16810))

### Fixed
- Update help output for _cat ([#14722](https://github.com/opensearch-project/OpenSearch/pull/14722))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,20 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;
import java.util.List;
import java.util.Set;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {
Expand Down Expand Up @@ -82,4 +88,47 @@ public void testReplication() throws Exception {
waitForSearchableDocs(docCount, primary, replica);
}

public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
createIndex(
INDEX_NAME,
Settings.builder()
.put("number_of_shards", 1)
.put("number_of_replicas", 0)
.put("number_of_search_only_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(INDEX_NAME);

final int docCount = 5;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, nodes);

SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();

// Verify the number of indices
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().size());
// Verify total shards
assertEquals(2, segmentReplicationStatsResponse.getTotalShards());
// Verify the number of primary shards
assertEquals(1, segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).size());

SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get(INDEX_NAME).get(0);
Set<SegmentReplicationShardStats> replicaStats = perGroupStats.getReplicaStats();
// Verify the number of replica stats
assertEquals(1, replicaStats.size());
for (SegmentReplicationShardStats replicaStat : replicaStats) {
assertNotNull(replicaStat.getCurrentReplicationState());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.breaker.ResponseLimitBreachedException;
import org.opensearch.common.breaker.ResponseLimitSettings;
import org.opensearch.common.inject.Inject;
Expand All @@ -27,6 +29,7 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

import java.util.List;
import java.util.Objects;

import static org.opensearch.common.breaker.ResponseLimitSettings.LimitEntity.SHARDS;
Expand Down Expand Up @@ -98,18 +101,19 @@ public void onResponse(ClusterStateResponse clusterStateResponse) {
shardsRequest.getPageParams(),
clusterStateResponse
);
String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: paginationStrategy.getRequestedIndices().toArray(new String[0]);
catShardsResponse.setNodes(clusterStateResponse.getState().getNodes());
catShardsResponse.setResponseShards(
Objects.isNull(paginationStrategy)
? clusterStateResponse.getState().routingTable().allShards()
: paginationStrategy.getRequestedEntities()
);
catShardsResponse.setPageToken(Objects.isNull(paginationStrategy) ? null : paginationStrategy.getResponseToken());

String[] indices = Objects.isNull(paginationStrategy)
? shardsRequest.getIndices()
: filterClosedIndices(clusterStateResponse.getState(), paginationStrategy.getRequestedIndices());
// For paginated queries, if strategy outputs no shards to be returned, avoid fetching IndicesStats.
if (shouldSkipIndicesStatsRequest(paginationStrategy)) {
if (shouldSkipIndicesStatsRequest(paginationStrategy, indices)) {
catShardsResponse.setIndicesStatsResponse(IndicesStatsResponse.getEmptyResponse());
cancellableListener.onResponse(catShardsResponse);
return;
Expand Down Expand Up @@ -166,7 +170,19 @@ private void validateRequestLimit(
}
}

private boolean shouldSkipIndicesStatsRequest(ShardPaginationStrategy paginationStrategy) {
return Objects.nonNull(paginationStrategy) && paginationStrategy.getRequestedEntities().isEmpty();
private boolean shouldSkipIndicesStatsRequest(ShardPaginationStrategy paginationStrategy, String[] indices) {
return Objects.nonNull(paginationStrategy) && (indices == null || indices.length == 0);
}

/**
* Will be used by paginated query (_list/shards) to filter out closed indices (only consider OPEN) before fetching
* IndicesStats. Since pagination strategy always passes concrete indices to TransportIndicesStatsAction,
* the default behaviour of StrictExpandOpenAndForbidClosed leads to errors if closed indices are encountered.
*/
private String[] filterClosedIndices(ClusterState clusterState, List<String> strategyIndices) {
return strategyIndices.stream().filter(index -> {
IndexMetadata metadata = clusterState.metadata().indices().get(index);
return metadata != null && metadata.getState().equals(IndexMetadata.State.CLOSE) == false;
}).toArray(String[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationShardStats;
Expand All @@ -38,7 +37,9 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Transport action for shard segment replication operation. This transport action does not actually
Expand Down Expand Up @@ -96,11 +97,11 @@ protected SegmentReplicationStatsResponse newResponse(
) {
String[] shards = request.shards();
final List<Integer> shardsToFetch = Arrays.stream(shards).map(Integer::valueOf).collect(Collectors.toList());

// organize replica responses by allocationId.
final Map<String, SegmentReplicationState> replicaStats = new HashMap<>();
// map of index name to list of replication group stats.
final Map<String, List<SegmentReplicationPerGroupStats>> primaryStats = new HashMap<>();

for (SegmentReplicationShardStatsResponse response : responses) {
if (response != null) {
if (response.getReplicaStats() != null) {
Expand All @@ -109,6 +110,7 @@ protected SegmentReplicationStatsResponse newResponse(
replicaStats.putIfAbsent(shardRouting.allocationId().getId(), response.getReplicaStats());
}
}

if (response.getPrimaryStats() != null) {
final ShardId shardId = response.getPrimaryStats().getShardId();
if (shardsToFetch.isEmpty() || shardsToFetch.contains(shardId.getId())) {
Expand All @@ -126,15 +128,20 @@ protected SegmentReplicationStatsResponse newResponse(
}
}
}
// combine the replica stats to the shard stat entry in each group.
for (Map.Entry<String, List<SegmentReplicationPerGroupStats>> entry : primaryStats.entrySet()) {
for (SegmentReplicationPerGroupStats group : entry.getValue()) {
for (SegmentReplicationShardStats replicaStat : group.getReplicaStats()) {
replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null));
}
}
}
return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, primaryStats, shardFailures);

Map<String, List<SegmentReplicationPerGroupStats>> replicationStats = primaryStats.entrySet()
.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue()
.stream()
.map(groupStats -> updateGroupStats(groupStats, replicaStats))
.collect(Collectors.toList())
)
);

return new SegmentReplicationStatsResponse(totalShards, successfulShards, failedShards, replicationStats, shardFailures);
}

@Override
Expand All @@ -144,9 +151,8 @@ protected SegmentReplicationStatsRequest readRequestFrom(StreamInput in) throws

@Override
protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplicationStatsRequest request, ShardRouting shardRouting) {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
ShardId shardId = shardRouting.shardId();
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());

if (indexShard.indexSettings().isSegRepEnabledOrRemoteNode() == false) {
return null;
Expand All @@ -156,11 +162,7 @@ protected SegmentReplicationShardStatsResponse shardOperation(SegmentReplication
return new SegmentReplicationShardStatsResponse(pressureService.getStatsForShard(indexShard));
}

// return information about only on-going segment replication events.
if (request.activeOnly()) {
return new SegmentReplicationShardStatsResponse(targetService.getOngoingEventSegmentReplicationState(shardId));
}
return new SegmentReplicationShardStatsResponse(targetService.getSegmentReplicationState(shardId));
return new SegmentReplicationShardStatsResponse(getSegmentReplicationState(shardId, request.activeOnly()));
}

@Override
Expand All @@ -181,4 +183,83 @@ protected ClusterBlockException checkRequestBlock(
) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}

private SegmentReplicationPerGroupStats updateGroupStats(
SegmentReplicationPerGroupStats groupStats,
Map<String, SegmentReplicationState> replicaStats
) {
// Update the SegmentReplicationState for each of the replicas
Set<SegmentReplicationShardStats> updatedReplicaStats = groupStats.getReplicaStats()
.stream()
.peek(replicaStat -> replicaStat.setCurrentReplicationState(replicaStats.getOrDefault(replicaStat.getAllocationId(), null)))
.collect(Collectors.toSet());

// Compute search replica stats
Set<SegmentReplicationShardStats> searchReplicaStats = computeSearchReplicaStats(groupStats.getShardId(), replicaStats);

// Combine ReplicaStats and SearchReplicaStats
Set<SegmentReplicationShardStats> combinedStats = Stream.concat(updatedReplicaStats.stream(), searchReplicaStats.stream())
.collect(Collectors.toSet());

return new SegmentReplicationPerGroupStats(groupStats.getShardId(), combinedStats, groupStats.getRejectedRequestCount());
}

private Set<SegmentReplicationShardStats> computeSearchReplicaStats(
ShardId shardId,
Map<String, SegmentReplicationState> replicaStats
) {
return replicaStats.values()
.stream()
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().shardId().equals(shardId))
.filter(segmentReplicationState -> segmentReplicationState.getShardRouting().isSearchOnly())
.map(segmentReplicationState -> {
ShardRouting shardRouting = segmentReplicationState.getShardRouting();
SegmentReplicationShardStats segmentReplicationStats = computeSegmentReplicationShardStats(shardRouting);
segmentReplicationStats.setCurrentReplicationState(segmentReplicationState);
return segmentReplicationStats;
})
.collect(Collectors.toSet());
}

SegmentReplicationShardStats computeSegmentReplicationShardStats(ShardRouting shardRouting) {
ShardId shardId = shardRouting.shardId();
SegmentReplicationState completedSegmentReplicationState = targetService.getlatestCompletedEventSegmentReplicationState(shardId);
SegmentReplicationState ongoingSegmentReplicationState = targetService.getOngoingEventSegmentReplicationState(shardId);

return new SegmentReplicationShardStats(
shardRouting.allocationId().getId(),
0,
calculateBytesRemainingToReplicate(ongoingSegmentReplicationState),
0,
getCurrentReplicationLag(ongoingSegmentReplicationState),
getLastCompletedReplicationLag(completedSegmentReplicationState)
);
}

private SegmentReplicationState getSegmentReplicationState(ShardId shardId, boolean isActiveOnly) {
if (isActiveOnly) {
return targetService.getOngoingEventSegmentReplicationState(shardId);
} else {
return targetService.getSegmentReplicationState(shardId);
}
}

private long calculateBytesRemainingToReplicate(SegmentReplicationState ongoingSegmentReplicationState) {
if (ongoingSegmentReplicationState == null) {
return 0;
}
return ongoingSegmentReplicationState.getIndex()
.fileDetails()
.stream()
.mapToLong(index -> index.length() - index.recovered())
.sum();
}

private long getCurrentReplicationLag(SegmentReplicationState ongoingSegmentReplicationState) {
return ongoingSegmentReplicationState != null ? ongoingSegmentReplicationState.getTimer().time() : 0;
}

private long getLastCompletedReplicationLag(SegmentReplicationState completedSegmentReplicationState) {
return completedSegmentReplicationState != null ? completedSegmentReplicationState.getTimer().time() : 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,17 @@ void balance() {
final Map<String, Integer> nodePrimaryShardCount = calculateNodePrimaryShardCount(remoteRoutingNodes);
int totalPrimaryShardCount = nodePrimaryShardCount.values().stream().reduce(0, Integer::sum);

totalPrimaryShardCount += routingNodes.unassigned().getNumPrimaries();
int avgPrimaryPerNode = (totalPrimaryShardCount + routingNodes.size() - 1) / routingNodes.size();
int unassignedRemotePrimaryShardCount = 0;
for (ShardRouting shard : routingNodes.unassigned()) {
if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shard, allocation)) && shard.primary()) {
unassignedRemotePrimaryShardCount++;
}
}
totalPrimaryShardCount += unassignedRemotePrimaryShardCount;
final int avgPrimaryPerNode = (totalPrimaryShardCount + remoteRoutingNodes.size() - 1) / remoteRoutingNodes.size();

ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> sourceNodes = new ArrayDeque<>();
final ArrayDeque<RoutingNode> targetNodes = new ArrayDeque<>();
for (RoutingNode node : remoteRoutingNodes) {
if (nodePrimaryShardCount.get(node.nodeId()) > avgPrimaryPerNode) {
sourceNodes.add(node);
Expand Down
Loading

0 comments on commit 5714d79

Please sign in to comment.