Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
Expand Down Expand Up @@ -239,6 +240,9 @@ private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFCacheConfigMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFShardOperationCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
Expand Down Expand Up @@ -403,6 +407,8 @@ public Collection<Object> createComponents(
// initialize it. This is the earliest point at which we know ClusterService is created.
// So, call the initialize method here.
clusterSettingsManager.initialize();
// Initialize ShardMetricsCollector histograms
ShardMetricsCollector.INSTANCE.initialize();
return Collections.singletonList(performanceAnalyzerController);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer;

import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* A singleton collector for recording per-shard CPU and heap metrics in OpenSearch. This class
* maintains two histograms:
*
* <ul>
* <li>CPU utilization histogram - tracks CPU usage per shard
* <li>Heap usage histogram - tracks heap memory allocation per shard
* </ul>
*
* The metrics are recorded with tags for better categorization and analysis.
*/
public final class ShardMetricsCollector {
/** Singleton instance of the ShardMetricsCollector */
public static final ShardMetricsCollector INSTANCE = new ShardMetricsCollector();

public static final String SHARD_CPU_UTILIZATION = "shard_cpu_utilization";
public static final String SHARD_HEAP_ALLOCATED = "shard_heap_allocated";

/** Histogram for tracking CPU utilization -- GETTER -- Gets the CPU utilization histogram. */
private Histogram cpuUtilizationHistogram;

/** Histogram for tracking heap usage -- GETTER -- Gets the heap usage histogram. */
private Histogram heapUsedHistogram;

/**
* Private constructor that initializes the CPU and heap histograms. This is private to ensure
* singleton pattern.
*/
private ShardMetricsCollector() {
this.cpuUtilizationHistogram = null;
this.heapUsedHistogram = null;
}

/** Initialise metric histograms */
public void initialize() {
if (this.cpuUtilizationHistogram == null) {
this.cpuUtilizationHistogram = createCpuUtilizationHistogram();
}
if (this.heapUsedHistogram == null) {
this.heapUsedHistogram = createHeapUsedHistogram();
}
}

/**
* Creates a histogram for tracking CPU utilization.
*
* @return A histogram instance for CPU metrics, or null if metrics registry is unavailable
*/
private Histogram createCpuUtilizationHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
SHARD_CPU_UTILIZATION,
"CPU Utilization per shard for an operation",
RTFMetrics.MetricUnits.RATE.toString());
}
return null;
}

/**
* Creates a histogram for tracking heap usage.
*
* @return A histogram instance for heap metrics, or null if metrics registry is unavailable
*/
private Histogram createHeapUsedHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
SHARD_HEAP_ALLOCATED,
"Heap Utilization per shard for an operation",
RTFMetrics.MetricUnits.BYTE.toString());
}
return null;
}

/**
* Records a CPU utilization measurement with associated tags.
*
* @param cpuUtilization The CPU utilization value to record (as a percentage)
* @param tags The tags to associate with this measurement (e.g., shard ID, operation type)
*/
public void recordCpuUtilization(double cpuUtilization, Tags tags) {
if (cpuUtilizationHistogram != null) {
cpuUtilizationHistogram.record(cpuUtilization, tags);
}
}

/**
* Records a heap usage measurement with associated tags.
*
* @param heapBytes The heap usage value to record (in bytes)
* @param tags The tags to associate with this measurement (e.g., shard ID, operation type)
*/
public void recordHeapUsed(double heapBytes, Tags tags) {
if (heapUsedHistogram != null) {
heapUsedHistogram.record(heapBytes, tags);
}
}

public Histogram getCpuUtilizationHistogram() {
return cpuUtilizationHistogram;
}

public Histogram getHeapUsedHistogram() {
return heapUsedHistogram;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -57,9 +58,7 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri
private static final int KEYS_PATH_LENGTH = 2;
private static final Logger LOG =
LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class);
private HashMap<ShardId, IndexShard> currentShards;
private HashMap<ShardId, ShardStats> currentPerShardStats;
private HashMap<ShardId, ShardStats> prevPerShardStats;
private Map<ShardId, ShardStats> prevPerShardStats;
private final PerformanceAnalyzerController controller;

public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) {
Expand All @@ -68,21 +67,10 @@ public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController co
"NodeStatsMetrics",
NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME,
NODESTATS_COLLECTION_ERROR);
currentShards = new HashMap<>();
prevPerShardStats = new HashMap<>();
currentPerShardStats = new HashMap<>();
this.controller = controller;
}

private void populateCurrentShards() {
if (!currentShards.isEmpty()) {
prevPerShardStats.putAll(currentPerShardStats);
currentPerShardStats.clear();
}
currentShards.clear();
currentShards = Utils.getShards();
}

private static final Map<String, ValueCalculator> maps =
new HashMap<String, ValueCalculator>() {
{
Expand Down Expand Up @@ -152,13 +140,13 @@ public void collectMetrics(long startTime) {
if (indicesService == null) {
return;
}
populateCurrentShards();
populatePerShardStats(indicesService);

for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = (ShardId) currentShard.getKey();
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
if (prevPerShardStats.size() == 0) {
Map<ShardId, ShardStats> currentPerShardStats = populatePerShardStats(indicesService);

for (HashMap.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = currentShard.getKey();
ShardStats currentShardStats = currentShard.getValue();
if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) {
// Populating value for the first run.
populateMetricValue(
currentShardStats, startTime, shardId.getIndexName(), shardId.id());
Expand All @@ -179,6 +167,7 @@ public void collectMetrics(long startTime) {
populateDiffMetricValue(
prevValue, currValue, startTime, shardId.getIndexName(), shardId.id());
}
prevPerShardStats = currentPerShardStats;
}

// - Separated to have a unit test; and catch any code changes around this field
Expand All @@ -188,10 +177,12 @@ Field getNodeIndicesStatsByShardField() throws Exception {
return field;
}

public void populatePerShardStats(IndicesService indicesService) {
public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesService) {
// Populate the shard stats per shard.
for (HashMap.Entry currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = (IndexShard) currentShard.getValue();
HashMap<ShardId, IndexShard> currentShards = Utils.getShards();
Map<ShardId, ShardStats> currentPerShardStats = new HashMap<>(Collections.emptyMap());
for (HashMap.Entry<ShardId, IndexShard> currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = currentShard.getValue();
IndexShardStats currentIndexShardStats =
Utils.indexShardStats(
indicesService,
Expand All @@ -200,20 +191,24 @@ public void populatePerShardStats(IndicesService indicesService) {
CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
if (currentIndexShardStats != null) {
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
}
}
}
return currentPerShardStats;
}

public void populateMetricValue(
ShardStats shardStats, long startTime, String IndexName, int ShardId) {
StringBuilder value = new StringBuilder();
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
// Populate the result with cache specific metrics only.
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize());
saveMetricValues(value.toString(), startTime, IndexName, String.valueOf(ShardId));
String value =
PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()
+
// Populate the result with cache specific metrics only.
PerformanceAnalyzerMetrics.sMetricNewLineDelimitor
+ new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize();
saveMetricValues(value, startTime, IndexName, String.valueOf(ShardId));
}

public void populateDiffMetricValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -42,8 +43,6 @@ public class RTFNodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMe
.samplingInterval;
private static final Logger LOG =
LogManager.getLogger(RTFNodeStatsAllShardsMetricsCollector.class);
private Map<ShardId, IndexShard> currentShards;
private Map<ShardId, ShardStats> currentPerShardStats;
private Map<ShardId, ShardStats> prevPerShardStats;
private MetricsRegistry metricsRegistry;
private Counter cacheQueryHitMetrics;
Expand All @@ -67,23 +66,12 @@ public RTFNodeStatsAllShardsMetricsCollector(
"RTFNodeStatsMetricsCollector",
RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME,
RTF_NODESTATS_COLLECTION_ERROR);
currentShards = new HashMap<>();
prevPerShardStats = new HashMap<>();
currentPerShardStats = new HashMap<>();
this.metricsInitialised = false;
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

private void populateCurrentShards() {
if (!currentShards.isEmpty()) {
prevPerShardStats.putAll(currentPerShardStats);
currentPerShardStats.clear();
}
currentShards.clear();
currentShards = Utils.getShards();
}

private static final ImmutableMap<String, ValueCalculator> valueCalculators =
ImmutableMap.of(
RTFMetrics.ShardStatsValue.INDEXING_THROTTLE_TIME.toString(),
Expand Down Expand Up @@ -117,7 +105,7 @@ private void populateCurrentShards() {
public void collectMetrics(long startTime) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
LOG.info("RTFNodeStatsMetricsCollector is disabled. Skipping collection.");
return;
}
IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService();
Expand All @@ -133,38 +121,30 @@ configOverridesWrapper, getCollectorName())) {

LOG.debug("Executing collect metrics for RTFNodeStatsAllShardsMetricsCollector");
initialiseMetricsIfNeeded();
populateCurrentShards();
populatePerShardStats(indicesService);

for (Map.Entry currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = (ShardId) currentShard.getKey();
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
if (prevPerShardStats.size() == 0) {
// Populating value for the first run.
Map<ShardId, ShardStats> currentPerShardStats = populatePerShardStats(indicesService);

for (Map.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = currentShard.getKey();
ShardStats currentShardStats = currentShard.getValue();
if (prevPerShardStats.isEmpty() || prevPerShardStats.get(shardId) == null) {
// Populating value for the first run of shard.
recordMetrics(
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
shardId);
continue;
}
ShardStats prevShardStats = prevPerShardStats.get(shardId);
if (prevShardStats == null) {
// Populate value for shards which are new and were not present in the previous
// run.
recordMetrics(
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
shardId);
continue;
}
NodeStatsMetricsAllShardsPerCollectionStatus prevValue =
new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats);
NodeStatsMetricsAllShardsPerCollectionStatus currValue =
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats);
populateDiffMetricValue(prevValue, currValue, shardId);
}
prevPerShardStats = currentPerShardStats;
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
if (!metricsInitialised) {
cacheQueryHitMetrics =
metricsRegistry.createCounter(
RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE,
Expand Down Expand Up @@ -222,10 +202,12 @@ private void initialiseMetricsIfNeeded() {
}
}

public void populatePerShardStats(IndicesService indicesService) {
public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesService) {
// Populate the shard stats per shard.
for (Map.Entry currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = (IndexShard) currentShard.getValue();
Map<ShardId, IndexShard> currentShards = Utils.getShards();
Map<ShardId, ShardStats> currentPerShardStats = new HashMap<>(Collections.emptyMap());
for (Map.Entry<ShardId, IndexShard> currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = currentShard.getValue();
IndexShardStats currentIndexShardStats =
Utils.indexShardStats(
indicesService,
Expand All @@ -234,10 +216,13 @@ public void populatePerShardStats(IndicesService indicesService) {
CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
if (currentIndexShardStats != null) {
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
}
}
}
return currentPerShardStats;
}

private void recordMetrics(
Expand Down
Loading
Loading