Skip to content

Commit

Permalink
[Segment Replication] Add Segment Replication backpressure rejection …
Browse files Browse the repository at this point in the history
…stats to _nodes/stats (opensearch-project#10656)

* Initial WIP for adding segrep backpressure to node stats.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Bind SegmentReplicarionStatsTracker in Node.java

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* remove additional segrep backpressure info from node stats

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* fix metric name in node stats

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix compile error.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Fix compile errors.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Update java docs.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR and fix compile errors.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments on PR.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Update unit test.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
  • Loading branch information
Rishikesh1159 authored Oct 21, 2023
1 parent 1e9ec52 commit 51626d0
Show file tree
Hide file tree
Showing 16 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.stats.IndexingPressureStats;
import org.opensearch.index.stats.ShardIndexingPressureStats;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
Expand Down Expand Up @@ -129,6 +130,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private SearchBackpressureStats searchBackpressureStats;

@Nullable
private SegmentReplicationRejectionStats segmentReplicationRejectionStats;

@Nullable
private ClusterManagerThrottlingStats clusterManagerThrottlingStats;

Expand Down Expand Up @@ -211,6 +215,12 @@ public NodeStats(StreamInput in) throws IOException {
} else {
resourceUsageStats = null;
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
segmentReplicationRejectionStats = in.readOptionalWriteable(SegmentReplicationRejectionStats::new);
} else {
segmentReplicationRejectionStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
repositoriesStats = in.readOptionalWriteable(RepositoriesStats::new);
} else {
Expand Down Expand Up @@ -244,6 +254,7 @@ public NodeStats(
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats
) {
super(node);
Expand Down Expand Up @@ -271,6 +282,7 @@ public NodeStats(
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
}

Expand Down Expand Up @@ -415,6 +427,10 @@ public SearchPipelineStats getSearchPipelineStats() {
}

@Nullable
public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() {
return segmentReplicationRejectionStats;
}

public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}
Expand Down Expand Up @@ -465,6 +481,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(resourceUsageStats);
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(segmentReplicationRejectionStats);
}
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(repositoriesStats);
}
Expand Down Expand Up @@ -561,6 +581,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getResourceUsageStats() != null) {
getResourceUsageStats().toXContent(builder, params);
}
if (getSegmentReplicationRejectionStats() != null) {
getSegmentReplicationRejectionStats().toXContent(builder, params);
}

if (getRepositoriesStats() != null) {
getRepositoriesStats().toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public enum Metric {
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories");

private String metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ public SegmentReplicationPressureService(
ClusterService clusterService,
IndicesService indicesService,
ShardStateAction shardStateAction,
SegmentReplicationStatsTracker tracker,
ThreadPool threadPool
) {
this.indicesService = indicesService;
this.tracker = new SegmentReplicationStatsTracker(this.indicesService);
this.tracker = tracker;
this.shardStateAction = shardStateAction;
this.threadPool = threadPool;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.Version;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Segment replication rejection stats.
*
* @opensearch.internal
*/
public class SegmentReplicationRejectionStats implements Writeable, ToXContentFragment {

/**
* Total rejections due to segment replication backpressure
*/
private long totalRejectionCount;

public SegmentReplicationRejectionStats(final long totalRejectionCount) {
this.totalRejectionCount = totalRejectionCount;
}

public SegmentReplicationRejectionStats(StreamInput in) throws IOException {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
this.totalRejectionCount = in.readVLong();
}
}

public long getTotalRejectionCount() {
return totalRejectionCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("segment_replication_backpressure");
builder.field("total_rejected_requests", totalRejectionCount);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeVLong(totalRejectionCount);
}
}

@Override
public String toString() {
return "SegmentReplicationRejectionStats{ totalRejectedRequestCount=" + totalRejectionCount + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public SegmentReplicationStatsTracker(IndicesService indicesService) {
rejectionCount = ConcurrentCollections.newConcurrentMap();
}

public SegmentReplicationRejectionStats getTotalRejectionStats() {
return new SegmentReplicationRejectionStats(this.rejectionCount.values().stream().mapToInt(AtomicInteger::get).sum());
}

protected Map<ShardId, AtomicInteger> getRejectionCount() {
return rejectionCount;
}

public SegmentReplicationStats getStats() {
Map<ShardId, SegmentReplicationPerGroupStats> stats = new HashMap<>();
for (IndexService indexService : indicesService) {
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.analysis.AnalysisRegistry;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
Expand Down Expand Up @@ -977,6 +978,7 @@ protected Node(
transportService.getTaskManager()
);

final SegmentReplicationStatsTracker segmentReplicationStatsTracker = new SegmentReplicationStatsTracker(indicesService);
RepositoriesModule repositoriesModule = new RepositoriesModule(
this.environment,
pluginsService.filterPlugins(RepositoryPlugin.class),
Expand Down Expand Up @@ -1116,6 +1118,7 @@ protected Node(
fileCache,
taskCancellationMonitoringService,
resourceUsageCollectorService,
segmentReplicationStatsTracker,
repositoryService
);

Expand Down Expand Up @@ -1246,6 +1249,7 @@ protected Node(
b.bind(MetricsRegistry.class).toInstance(metricsRegistry);
b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService);
b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry);
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
});
injector = modules.createInjector();

Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.discovery.Discovery;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.SegmentReplicationStatsTracker;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.ingest.IngestService;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class NodeService implements Closeable {
private final TaskCancellationMonitoringService taskCancellationMonitoringService;
private final RepositoriesService repositoriesService;

private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

NodeService(
Settings settings,
ThreadPool threadPool,
Expand All @@ -119,6 +122,7 @@ public class NodeService implements Closeable {
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService,
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService
) {
this.settings = settings;
Expand Down Expand Up @@ -146,6 +150,7 @@ public class NodeService implements Closeable {
this.repositoriesService = repositoriesService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
}

public NodeInfo info(
Expand Down Expand Up @@ -226,6 +231,7 @@ public NodeStats stats(
boolean taskCancellation,
boolean searchPipelineStats,
boolean resourceUsageStats,
boolean segmentReplicationTrackerStats,
boolean repositoriesStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
Expand Down Expand Up @@ -256,6 +262,7 @@ public NodeStats stats(
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.http.HttpStats;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationRejectionStats;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.translog.RemoteTranslogStats;
Expand Down Expand Up @@ -417,6 +418,17 @@ public void testSerialization() throws IOException {
assertEquals(aResourceUsageStats.getTimestamp(), bResourceUsageStats.getTimestamp());
});
}
SegmentReplicationRejectionStats segmentReplicationRejectionStats = nodeStats.getSegmentReplicationRejectionStats();
SegmentReplicationRejectionStats deserializedSegmentReplicationRejectionStats = deserializedNodeStats
.getSegmentReplicationRejectionStats();
if (segmentReplicationRejectionStats == null) {
assertNull(deserializedSegmentReplicationRejectionStats);
} else {
assertEquals(
segmentReplicationRejectionStats.getTotalRejectionCount(),
deserializedSegmentReplicationRejectionStats.getTotalRejectionCount()
);
}
ScriptCacheStats scriptCacheStats = nodeStats.getScriptCacheStats();
ScriptCacheStats deserializedScriptCacheStats = deserializedNodeStats.getScriptCacheStats();
if (scriptCacheStats == null) {
Expand Down Expand Up @@ -812,6 +824,11 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
}
nodesResourceUsageStats = new NodesResourceUsageStats(resourceUsageStatsMap);
}
SegmentReplicationRejectionStats segmentReplicationRejectionStats = null;
if (frequently()) {
segmentReplicationRejectionStats = new SegmentReplicationRejectionStats(randomNonNegativeLong());
}

ClusterManagerThrottlingStats clusterManagerThrottlingStats = null;
if (frequently()) {
clusterManagerThrottlingStats = new ClusterManagerThrottlingStats();
Expand Down Expand Up @@ -853,6 +870,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
null,
null,
null,
segmentReplicationRejectionStats,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -220,6 +221,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -248,6 +250,7 @@ public void testFillDiskUsage() {
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -307,6 +310,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -335,6 +339,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
),
new NodeStats(
Expand Down Expand Up @@ -363,6 +368,7 @@ public void testFillDiskUsageSomeInvalidValues() {
null,
null,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ private SegmentReplicationPressureService buildPressureService(Settings settings
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));

return new SegmentReplicationPressureService(settings, clusterService, indicesService, shardStateAction, mock(ThreadPool.class));
return new SegmentReplicationPressureService(
settings,
clusterService,
indicesService,
shardStateAction,
new SegmentReplicationStatsTracker(indicesService),
mock(ThreadPool.class)
);
}
}
Loading

0 comments on commit 51626d0

Please sign in to comment.