Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add Segment Replication backpressure rejection stats to _nodes/stats #10656

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Bind SegmentReplicarionStatsTracker in Node.java
Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Oct 16, 2023
commit 7da3cfb8bdc79c5d76e49727739ceab342a9bf80
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ public SegmentReplicationPressureService(
ClusterService clusterService,
IndicesService indicesService,
ShardStateAction shardStateAction,
SegmentReplicationStatsTracker tracker,
ThreadPool threadPool
) {
this.indicesService = indicesService;
this.tracker = new SegmentReplicationStatsTracker(this.indicesService);
this.tracker = tracker;
this.shardStateAction = shardStateAction;
this.threadPool = threadPool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.index;

import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -28,6 +29,7 @@ public class SegmentReplicationStatsTracker {
private final IndicesService indicesService;
private final Map<ShardId, AtomicInteger> rejectionCount;

@Inject
mch2 marked this conversation as resolved.
Show resolved Hide resolved
public SegmentReplicationStatsTracker(IndicesService indicesService) {
this.indicesService = indicesService;
rejectionCount = ConcurrentCollections.newConcurrentMap();
Expand Down
8 changes: 3 additions & 5 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.opensearch.cluster.InternalClusterInfoService;
import org.opensearch.cluster.NodeConnectionsService;
import org.opensearch.cluster.action.index.MappingUpdatedAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.cluster.metadata.AliasValidator;
import org.opensearch.cluster.metadata.IndexTemplateMetadata;
Expand Down Expand Up @@ -137,7 +136,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
Expand Down Expand Up @@ -966,8 +965,7 @@ protected Node(
transportService.getTaskManager()
);

ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, clusterModule.getAllocationService(), rerouteService, threadPool);
final SegmentReplicationPressureService segmentReplicationPressureService =new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, threadPool);
final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
Expand Down Expand Up @@ -1107,7 +1105,7 @@ protected Node(
fileCache,
taskCancellationMonitoringService,
resourceUsageCollectorService,
segmentReplicationPressureService
segmentReplicationStatsTracker
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.opensearch.discovery.Discovery;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -95,7 +95,7 @@ public class NodeService implements Closeable {
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;

private final SegmentReplicationPressureService segmentReplicationPressureService;
private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

NodeService(
Settings settings,
Expand All @@ -120,7 +120,7 @@ public class NodeService implements Closeable {
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService,
SegmentReplicationPressureService segmentReplicationPressureService
SegmentReplicationStatsTracker segmentReplicationStatsTracker
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -146,7 +146,7 @@ public class NodeService implements Closeable {
this.resourceUsageCollectorService = resourceUsageCollectorService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationPressureService = segmentReplicationPressureService;
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
}

public NodeInfo info(
Expand Down Expand Up @@ -227,7 +227,7 @@ public NodeStats stats(
boolean taskCancellation,
boolean searchPipelineStats,
boolean resourceUsageStats,
boolean segmentReplicationBackPressureStats
boolean segmentReplicationTrackerStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -257,7 +257,7 @@ public NodeStats stats(
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationBackPressureStats ? this.segmentReplicationPressureService.nodeStats() : null
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getStats() : null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));

return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class));
return new SegmentReplicationPressureService(
settings,
clusterService,
indicesService,
shardStateAction,
new SegmentReplicationStatsTracker(indicesService),
mock(ThreadPool.class)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import org.opensearch.gateway.TransportNodesListGatewayStartedShards;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.remote.RemoteStorePressureService;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
Expand Down Expand Up @@ -2186,6 +2187,7 @@ public void onFailure(final Exception e) {
clusterService,
mock(IndicesService.class),
mock(ShardStateAction.class),
mock(SegmentReplicationStatsTracker.class),
mock(ThreadPool.class)
),
mock(RemoteStorePressureService.class),
Expand Down