Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource usage trackers and resource usage collector service #9890

Merged
merged 23 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
13c5c90
Add node performance trackers and performance collector service
bharath-techie Sep 7, 2023
f861207
addressing comments
bharath-techie Sep 8, 2023
5fb7d51
Addressing comments
bharath-techie Sep 11, 2023
f867acf
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Sep 11, 2023
cdc11f0
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Sep 15, 2023
03b58cb
renaming stats class and injecting settings to trackers
bharath-techie Sep 24, 2023
39f4e0c
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Sep 27, 2023
320a71f
addressing review comments
bharath-techie Sep 27, 2023
4174432
addressing comments
bharath-techie Sep 28, 2023
12a191f
addressing comments
bharath-techie Sep 29, 2023
7ef52fe
addressing comments
bharath-techie Sep 29, 2023
6a01876
addressing comments, removing perf collector dependency in node perfo…
bharath-techie Oct 5, 2023
e7b0c49
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Oct 5, 2023
768eea4
Addressing comments
bharath-techie Oct 6, 2023
4e82164
addressing comment
bharath-techie Oct 9, 2023
ef42c3d
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Oct 9, 2023
6d71414
changing elapsed_time to timestamp
bharath-techie Oct 9, 2023
1ab5ec4
addressing comments
bharath-techie Oct 9, 2023
bc3a2cc
Refactoring test back to *Tests format as *IT is not recognized
bharath-techie Oct 12, 2023
6c02a91
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Oct 12, 2023
757f9d2
Renaming files and packages to ResourceUsage
bharath-techie Oct 13, 2023
690c168
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Oct 13, 2023
bf7f65b
Merge branch 'main' of github.com:opensearch-project/OpenSearch into …
bharath-techie Oct 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
renaming stats class and injecting settings to trackers
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Sep 24, 2023
commit 03b58cb31ac6a1240a73ac0524c22ccb6ce99ca2
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesPerformanceStats;
import org.opensearch.node.GlobalPerformanceStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
Expand Down Expand Up @@ -144,7 +144,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
private SearchPipelineStats searchPipelineStats;

@Nullable
private NodesPerformanceStats nodesPerformanceStats;
private GlobalPerformanceStats globalPerformanceStats;
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved

public NodeStats(StreamInput in) throws IOException {
super(in);
Expand Down Expand Up @@ -203,9 +203,9 @@ public NodeStats(StreamInput in) throws IOException {
searchPipelineStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // make it 2.11 when we backport
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
nodesPerformanceStats = in.readOptionalWriteable(NodesPerformanceStats::new);
globalPerformanceStats = in.readOptionalWriteable(GlobalPerformanceStats::new);
} else {
nodesPerformanceStats = null;
globalPerformanceStats = null;
}
}

Expand All @@ -225,7 +225,7 @@ public NodeStats(
@Nullable DiscoveryStats discoveryStats,
@Nullable IngestStats ingestStats,
@Nullable AdaptiveSelectionStats adaptiveSelectionStats,
@Nullable NodesPerformanceStats nodesPerformanceStats,
@Nullable GlobalPerformanceStats globalPerformanceStats,
@Nullable ScriptCacheStats scriptCacheStats,
@Nullable IndexingPressureStats indexingPressureStats,
@Nullable ShardIndexingPressureStats shardIndexingPressureStats,
Expand All @@ -250,7 +250,7 @@ public NodeStats(
this.scriptStats = scriptStats;
this.discoveryStats = discoveryStats;
this.ingestStats = ingestStats;
this.nodesPerformanceStats = nodesPerformanceStats;
this.globalPerformanceStats = globalPerformanceStats;
this.scriptCacheStats = scriptCacheStats;
this.indexingPressureStats = indexingPressureStats;
this.shardIndexingPressureStats = shardIndexingPressureStats;
Expand Down Expand Up @@ -356,8 +356,8 @@ public AdaptiveSelectionStats getAdaptiveSelectionStats() {
}

@Nullable
public NodesPerformanceStats getNodesPerformanceStats() {
return nodesPerformanceStats;
public GlobalPerformanceStats getNodesPerformanceStats() {
return globalPerformanceStats;
}

@Nullable
Expand Down Expand Up @@ -447,7 +447,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(searchPipelineStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO : make it 2.11 when we backport
out.writeOptionalWriteable(nodesPerformanceStats);
out.writeOptionalWriteable(globalPerformanceStats);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public enum Metric {
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline"),
NODES_PERFORMANCE_STATS("nodes_performance_stats");
GLOBAL_PERFORMANCE_STATS("performance_stats");
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.NODES_PERFORMANCE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.GLOBAL_PERFORMANCE_STATS.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,37 +21,40 @@
import java.util.concurrent.TimeUnit;

/**
* This class represents collected performance stats of all downstream nodes and the local node
* This class represents performance stats such as CPU, Memory and IO resource usage of each node along with the time
* elapsed from when the stats were recorded.
*/
public class NodesPerformanceStats implements Writeable, ToXContentFragment {
private final Map<String, NodePerformanceStatistics> nodePerfStats;
public class GlobalPerformanceStats implements Writeable, ToXContentFragment {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved

public NodesPerformanceStats(Map<String, NodePerformanceStatistics> nodePerfStats) {
this.nodePerfStats = nodePerfStats;
// Map of node id to perf stats of the corresponding node.
private final Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap;

public GlobalPerformanceStats(Map<String, NodePerformanceStatistics> nodeIdToPerfStatsMap) {
this.nodeIdToPerfStatsMap = nodeIdToPerfStatsMap;
}

public NodesPerformanceStats(StreamInput in) throws IOException {
this.nodePerfStats = in.readMap(StreamInput::readString, NodePerformanceStatistics::new);
public GlobalPerformanceStats(StreamInput in) throws IOException {
this.nodeIdToPerfStatsMap = in.readMap(StreamInput::readString, NodePerformanceStatistics::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.nodePerfStats, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
out.writeMap(this.nodeIdToPerfStatsMap, StreamOutput::writeString, (stream, stats) -> stats.writeTo(stream));
}

/**
* Returns map of node id to perf stats
* Returns map of node id to perf stats of the corresponding node.
*/
public Map<String, NodePerformanceStatistics> getNodeIdToNodePerfStatsMap() {
return nodePerfStats;
return nodeIdToPerfStatsMap;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes_performance_stats");
for (String nodeId : nodePerfStats.keySet()) {
builder.startObject("performance_stats");
for (String nodeId : nodeIdToPerfStatsMap.keySet()) {
builder.startObject(nodeId);
NodePerformanceStatistics perfStats = nodePerfStats.get(nodeId);
NodePerformanceStatistics perfStats = nodeIdToPerfStatsMap.get(nodeId);
if (perfStats != null) {

builder.field("cpu_utilization_percent", String.format(Locale.ROOT, "%.1f", perfStats.cpuUtilizationPercent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public Optional<NodePerformanceStatistics> getNodeStatistics(final String nodeId
return Optional.ofNullable(nodeIdToPerfStats.get(nodeId)).map(perfStats -> new NodePerformanceStatistics(perfStats));
}

public NodesPerformanceStats stats() {
return new NodesPerformanceStats(getAllNodeStatistics());
public GlobalPerformanceStats stats() {
return new GlobalPerformanceStats(getAllNodeStatistics());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -22,17 +21,8 @@
public class AverageCpuUsageTracker extends AbstractAverageUsageTracker {
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOGGER = LogManager.getLogger(AverageCpuUsageTracker.class);

public AverageCpuUsageTracker(
ThreadPool threadPool,
TimeValue pollingInterval,
TimeValue windowDuration,
ClusterSettings clusterSettings
) {
public AverageCpuUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
super(threadPool, pollingInterval, windowDuration);
clusterSettings.addSettingsUpdateConsumer(
PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
this::setWindowDuration
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

Expand All @@ -27,17 +26,8 @@ public class AverageMemoryUsageTracker extends AbstractAverageUsageTracker {

private static final MemoryMXBean MEMORY_MX_BEAN = ManagementFactory.getMemoryMXBean();

public AverageMemoryUsageTracker(
ThreadPool threadPool,
TimeValue pollingInterval,
TimeValue windowDuration,
ClusterSettings clusterSettings
) {
public AverageMemoryUsageTracker(ThreadPool threadPool, TimeValue pollingInterval, TimeValue windowDuration) {
super(threadPool, pollingInterval, windowDuration);
clusterSettings.addSettingsUpdateConsumer(
PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,
this::setWindowDuration
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,22 @@ void initialize() {
cpuUsageTracker = new AverageCpuUsageTracker(
threadPool,
performanceTrackerSettings.getCpuPollingInterval(),
performanceTrackerSettings.getCpuWindowDuration(),
clusterSettings
performanceTrackerSettings.getCpuWindowDuration()
);

clusterSettings.addSettingsUpdateConsumer(
PerformanceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING,
cpuUsageTracker::setWindowDuration
);

memoryUsageTracker = new AverageMemoryUsageTracker(
threadPool,
performanceTrackerSettings.getMemoryPollingInterval(),
performanceTrackerSettings.getMemoryWindowDuration(),
clusterSettings
performanceTrackerSettings.getMemoryWindowDuration()
);
clusterSettings.addSettingsUpdateConsumer(
PerformanceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING,
memoryUsageTracker::setWindowDuration
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
import org.opensearch.monitor.os.OsStats;
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.GlobalPerformanceStats;
import org.opensearch.node.NodePerformanceStatistics;
import org.opensearch.node.NodesPerformanceStats;
import org.opensearch.node.ResponseCollectorService;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
Expand Down Expand Up @@ -394,14 +394,14 @@ public void testSerialization() throws IOException {
assertEquals(aStats.responseTime, bStats.responseTime, 0.01);
});
}
NodesPerformanceStats nodesPerformanceStats = nodeStats.getNodesPerformanceStats();
NodesPerformanceStats deserializedNodePerfStats = deserializedNodeStats.getNodesPerformanceStats();
if (nodesPerformanceStats == null) {
GlobalPerformanceStats globalPerformanceStats = nodeStats.getNodesPerformanceStats();
GlobalPerformanceStats deserializedNodePerfStats = deserializedNodeStats.getNodesPerformanceStats();
if (globalPerformanceStats == null) {
assertNull(deserializedNodePerfStats);
} else {
nodesPerformanceStats.getNodeIdToNodePerfStatsMap().forEach((k, v) -> {
NodePerformanceStatistics aPerfStats = nodesPerformanceStats.getNodeIdToNodePerfStatsMap().get(k);
NodePerformanceStatistics bPerfStats = nodesPerformanceStats.getNodeIdToNodePerfStatsMap().get(k);
globalPerformanceStats.getNodeIdToNodePerfStatsMap().forEach((k, v) -> {
NodePerformanceStatistics aPerfStats = globalPerformanceStats.getNodeIdToNodePerfStatsMap().get(k);
NodePerformanceStatistics bPerfStats = globalPerformanceStats.getNodeIdToNodePerfStatsMap().get(k);
assertEquals(aPerfStats.getMemoryUtilizationPercent(), bPerfStats.getMemoryUtilizationPercent(), 0.0);
assertEquals(aPerfStats.getCpuUtilizationPercent(), bPerfStats.getCpuUtilizationPercent(), 0.0);
assertEquals(aPerfStats.getTimestamp(), bPerfStats.getTimestamp());
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -770,7 +770,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
}
adaptiveSelectionStats = new AdaptiveSelectionStats(nodeConnections, nodeStats);
}
NodesPerformanceStats nodesPerformanceStats = null;
GlobalPerformanceStats globalPerformanceStats = null;
if (frequently()) {
int numNodes = randomIntBetween(0, 10);
Map<String, Long> nodeConnections = new HashMap<>();
Expand All @@ -792,7 +792,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
nodePerfStats.put(nodeId, stats);
}
}
nodesPerformanceStats = new NodesPerformanceStats(nodePerfStats);
globalPerformanceStats = new GlobalPerformanceStats(nodePerfStats);
}
ClusterManagerThrottlingStats clusterManagerThrottlingStats = null;
if (frequently()) {
Expand Down Expand Up @@ -825,7 +825,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) {
discoveryStats,
ingestStats,
adaptiveSelectionStats,
nodesPerformanceStats,
globalPerformanceStats,
scriptCacheStats,
null,
null,
Expand Down