Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cluster state stats #10670

Merged
merged 12 commits into from
Oct 24, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.NoClusterManagerBlockService;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.MediaTypeRegistry;
Expand Down Expand Up @@ -199,6 +200,8 @@ public void testIsolateClusterManagerAndVerifyClusterStateConsensus() throws Exc
}

}
ClusterStateStats clusterStateStats = internalCluster().clusterService().getClusterManagerService().getClusterStateStats();
assertTrue(clusterStateStats.getUpdateFailed() > 0);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
Expand All @@ -19,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
Expand Down Expand Up @@ -94,6 +98,45 @@ public void testFullClusterRestoreStaleDelete() throws Exception {
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

public void testRemoteStateStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
String clusterManagerNode = internalCluster().getClusterManagerName();
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);

// Fetch _nodes/stats
NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats(clusterManagerNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();

// assert cluster state stats
DiscoveryStats discoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();

assertNotNull(discoveryStats.getClusterStateStats());
assertTrue(discoveryStats.getClusterStateStats().getUpdateSuccess() > 1);
assertEquals(0, discoveryStats.getClusterStateStats().getUpdateFailed());
assertTrue(discoveryStats.getClusterStateStats().getUpdateTotalTimeInMillis() > 0);
// assert remote state stats
assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getSuccessCount() > 1);
assertEquals(0, discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getFailedCount());
assertTrue(discoveryStats.getClusterStateStats().getPersistenceStats().get(0).getTotalTimeInMillis() > 0);

NodesStatsResponse nodesStatsResponseDataNode = client().admin()
.cluster()
.prepareNodesStats(dataNode)
.addMetric(NodesStatsRequest.Metric.DISCOVERY.metricName())
.get();
// assert cluster state stats for data node
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponseDataNode.getNodes().get(0).getDiscoveryStats();
assertNotNull(dataNodeDiscoveryStats.getClusterStateStats());
assertEquals(0, dataNodeDiscoveryStats.getClusterStateStats().getUpdateSuccess());
}

private void setReplicaCount(int replicaCount) {
client().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,12 @@ public interface PersistedState extends Closeable {
*/
void setLastAcceptedState(ClusterState clusterState);

/**
* Returns the stats for the persistence layer for {@link CoordinationState}.
* @return PersistedStateStats
*/
PersistedStateStats getStats();

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.cluster.service.ClusterApplier;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterStateStats;
import org.opensearch.common.Booleans;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
Expand Down Expand Up @@ -865,7 +866,16 @@ protected void doStart() {

@Override
public DiscoveryStats stats() {
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats());
ClusterStateStats clusterStateStats = clusterManagerService.getClusterStateStats();
ArrayList<PersistedStateStats> stats = new ArrayList<>();
Stream.of(PersistedStateRegistry.PersistedStateType.values()).forEach(stateType -> {
if (persistedStateRegistry.getPersistedState(stateType) != null
&& persistedStateRegistry.getPersistedState(stateType).getStats() != null) {
stats.add(persistedStateRegistry.getPersistedState(stateType).getStats());
}
});
clusterStateStats.setPersistenceStats(stats);
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public void setLastAcceptedState(ClusterState clusterState) {
this.acceptedState = clusterState;
}

@Override
public PersistedStateStats getStats() {
return null;
}

@Override
public long getCurrentTerm() {
return currentTerm;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Persisted cluster state related stats.
*
* @opensearch.internal
*/
public class PersistedStateStats implements Writeable, ToXContentObject {
private String statsName;
private AtomicLong totalTimeInMillis = new AtomicLong(0);
private AtomicLong failedCount = new AtomicLong(0);
private AtomicLong successCount = new AtomicLong(0);
private Map<String, AtomicLong> extendedFields = new HashMap<>(); // keeping minimal extensibility

public PersistedStateStats(String statsName) {
this.statsName = statsName;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(successCount.get());
out.writeVLong(failedCount.get());
out.writeVLong(totalTimeInMillis.get());
if (extendedFields.size() > 0) {
out.writeBoolean(true);
out.writeVInt(extendedFields.size());
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
out.writeString(extendedField.getKey());
out.writeVLong(extendedField.getValue().get());
}
} else {
out.writeBoolean(false);
}
}

public PersistedStateStats(StreamInput in) throws IOException {
this.successCount = new AtomicLong(in.readVLong());
this.failedCount = new AtomicLong(in.readVLong());
this.totalTimeInMillis = new AtomicLong(in.readVLong());
if (in.readBoolean()) {
int extendedFieldsSize = in.readVInt();
this.extendedFields = new HashMap<>();
for (int fieldNumber = 0; fieldNumber < extendedFieldsSize; fieldNumber++) {
extendedFields.put(in.readString(), new AtomicLong(in.readVLong()));
}
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(statsName);
builder.field(Fields.SUCCESS_COUNT, getSuccessCount());
builder.field(Fields.FAILED_COUNT, getFailedCount());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getTotalTimeInMillis());
if (extendedFields.size() > 0) {
for (Map.Entry<String, AtomicLong> extendedField : extendedFields.entrySet()) {
builder.field(extendedField.getKey(), extendedField.getValue().get());
}
}
builder.endObject();
return builder;
}

public void stateFailed() {
failedCount.incrementAndGet();
}

public void stateSucceeded() {
successCount.incrementAndGet();
}

/**
* Expects user to send time taken in milliseconds.
*
* @param timeTakenInUpload time taken in uploading the cluster state to remote
*/
public void stateTook(long timeTakenInUpload) {
totalTimeInMillis.addAndGet(timeTakenInUpload);
}

public long getTotalTimeInMillis() {
return totalTimeInMillis.get();
}

public long getFailedCount() {
return failedCount.get();
}

public long getSuccessCount() {
return successCount.get();
}

protected void addToExtendedFields(String extendedField, AtomicLong extendedFieldValue) {
this.extendedFields.put(extendedField, extendedFieldValue);
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String SUCCESS_COUNT = "success_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.service;

import org.opensearch.cluster.coordination.PersistedStateStats;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

/**
* Cluster state related stats.
*
* @opensearch.internal
*/
public class ClusterStateStats implements Writeable, ToXContentObject {

private AtomicLong updateSuccess = new AtomicLong(0);
private AtomicLong updateTotalTimeInMillis = new AtomicLong(0);
private AtomicLong updateFailed = new AtomicLong(0);
private List<PersistedStateStats> persistenceStats = new ArrayList<>();

public ClusterStateStats() {}

public long getUpdateSuccess() {
return updateSuccess.get();
}

public long getUpdateTotalTimeInMillis() {
return updateTotalTimeInMillis.get();
}

public long getUpdateFailed() {
return updateFailed.get();
}

public List<PersistedStateStats> getPersistenceStats() {
return persistenceStats;
}

public void stateUpdated() {
updateSuccess.incrementAndGet();
}

public void stateUpdateFailed() {
updateFailed.incrementAndGet();
}

public void stateUpdateTook(long stateUpdateTime) {
updateTotalTimeInMillis.addAndGet(stateUpdateTime);
}

public ClusterStateStats setPersistenceStats(List<PersistedStateStats> persistenceStats) {
this.persistenceStats = persistenceStats;
return this;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(updateSuccess.get());
out.writeVLong(updateTotalTimeInMillis.get());
out.writeVLong(updateFailed.get());
out.writeVInt(persistenceStats.size());
for (PersistedStateStats stats : persistenceStats) {
stats.writeTo(out);
}
}

public ClusterStateStats(StreamInput in) throws IOException {
this.updateSuccess = new AtomicLong(in.readVLong());
this.updateTotalTimeInMillis = new AtomicLong(in.readVLong());
this.updateFailed = new AtomicLong(in.readVLong());
int persistedStatsSize = in.readVInt();
this.persistenceStats = new ArrayList<>();
for (int statsNumber = 0; statsNumber < persistedStatsSize; statsNumber++) {
PersistedStateStats stats = new PersistedStateStats(in);
this.persistenceStats.add(stats);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CLUSTER_STATE_STATS);
builder.startObject(Fields.OVERALL);
builder.field(Fields.UPDATE_COUNT, getUpdateSuccess());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, getUpdateTotalTimeInMillis());
builder.field(Fields.FAILED_COUNT, getUpdateFailed());
builder.endObject();
for (PersistedStateStats stats : persistenceStats) {
stats.toXContent(builder, params);
}
builder.endObject();
return builder;
}

/**
* Fields for parsing and toXContent
*
* @opensearch.internal
*/
static final class Fields {
static final String CLUSTER_STATE_STATS = "cluster_state_stats";
static final String OVERALL = "overall";
static final String UPDATE_COUNT = "update_count";
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
static final String FAILED_COUNT = "failed_count";
}
}
Loading
Loading