Skip to content

Commit a181a25

Browse files
authored
[CCR] Add time since last auto follow fetch to auto follow stats (#36542)
For each remote cluster the auto follow coordinator, starts an auto follower that checks the remote cluster state and determines whether an index needs to be auto followed. The time since last auto follow is reported per remote cluster and gives insight whether the auto follow process is alive. Relates to #33007 Originates from #35895
1 parent 6f03899 commit a181a25

File tree

11 files changed

+379
-33
lines changed

11 files changed

+379
-33
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/AutoFollowStats.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public final class AutoFollowStats {
3939
static final ParseField RECENT_AUTO_FOLLOW_ERRORS = new ParseField("recent_auto_follow_errors");
4040
static final ParseField LEADER_INDEX = new ParseField("leader_index");
4141
static final ParseField AUTO_FOLLOW_EXCEPTION = new ParseField("auto_follow_exception");
42+
static final ParseField AUTO_FOLLOWED_CLUSTERS = new ParseField("auto_followed_clusters");
43+
static final ParseField CLUSTER_NAME = new ParseField("cluster_name");
44+
static final ParseField TIME_SINCE_LAST_CHECK_MILLIS = new ParseField("time_since_last_check_millis");
45+
static final ParseField LAST_SEEN_METADATA_VERSION = new ParseField("last_seen_metadata_version");
4246

4347
@SuppressWarnings("unchecked")
4448
static final ConstructingObjectParser<AutoFollowStats, Void> STATS_PARSER = new ConstructingObjectParser<>("auto_follow_stats",
@@ -48,6 +52,10 @@ public final class AutoFollowStats {
4852
(Long) args[2],
4953
new TreeMap<>(
5054
((List<Map.Entry<String, ElasticsearchException>>) args[3])
55+
.stream()
56+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
57+
new TreeMap<>(
58+
((List<Map.Entry<String, AutoFollowedCluster>>) args[4])
5159
.stream()
5260
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
5361
));
@@ -57,33 +65,47 @@ public final class AutoFollowStats {
5765
"auto_follow_stats_errors",
5866
args -> new AbstractMap.SimpleEntry<>((String) args[0], (ElasticsearchException) args[1]));
5967

68+
private static final ConstructingObjectParser<Map.Entry<String, AutoFollowedCluster>, Void> AUTO_FOLLOWED_CLUSTERS_PARSER =
69+
new ConstructingObjectParser<>(
70+
"auto_followed_clusters",
71+
args -> new AbstractMap.SimpleEntry<>((String) args[0], new AutoFollowedCluster((Long) args[1], (Long) args[2])));
72+
6073
static {
6174
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
6275
AUTO_FOLLOW_EXCEPTIONS_PARSER.declareObject(
6376
ConstructingObjectParser.constructorArg(),
6477
(p, c) -> ElasticsearchException.fromXContent(p),
6578
AUTO_FOLLOW_EXCEPTION);
6679

80+
AUTO_FOLLOWED_CLUSTERS_PARSER.declareString(ConstructingObjectParser.constructorArg(), CLUSTER_NAME);
81+
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIME_SINCE_LAST_CHECK_MILLIS);
82+
AUTO_FOLLOWED_CLUSTERS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_SEEN_METADATA_VERSION);
83+
6784
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_INDICES_AUTO_FOLLOWED);
6885
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_REMOTE_CLUSTER_STATE_REQUESTS);
6986
STATS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_INDICES_AUTO_FOLLOWED);
7087
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOW_EXCEPTIONS_PARSER,
7188
RECENT_AUTO_FOLLOW_ERRORS);
89+
STATS_PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), AUTO_FOLLOWED_CLUSTERS_PARSER,
90+
AUTO_FOLLOWED_CLUSTERS);
7291
}
7392

7493
private final long numberOfFailedFollowIndices;
7594
private final long numberOfFailedRemoteClusterStateRequests;
7695
private final long numberOfSuccessfulFollowIndices;
7796
private final NavigableMap<String, ElasticsearchException> recentAutoFollowErrors;
97+
private final NavigableMap<String, AutoFollowedCluster> autoFollowedClusters;
7898

7999
AutoFollowStats(long numberOfFailedFollowIndices,
80100
long numberOfFailedRemoteClusterStateRequests,
81101
long numberOfSuccessfulFollowIndices,
82-
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors) {
102+
NavigableMap<String, ElasticsearchException> recentAutoFollowErrors,
103+
NavigableMap<String, AutoFollowedCluster> autoFollowedClusters) {
83104
this.numberOfFailedFollowIndices = numberOfFailedFollowIndices;
84105
this.numberOfFailedRemoteClusterStateRequests = numberOfFailedRemoteClusterStateRequests;
85106
this.numberOfSuccessfulFollowIndices = numberOfSuccessfulFollowIndices;
86107
this.recentAutoFollowErrors = recentAutoFollowErrors;
108+
this.autoFollowedClusters = autoFollowedClusters;
87109
}
88110

89111
public long getNumberOfFailedFollowIndices() {
@@ -102,4 +124,27 @@ public NavigableMap<String, ElasticsearchException> getRecentAutoFollowErrors()
102124
return recentAutoFollowErrors;
103125
}
104126

127+
public NavigableMap<String, AutoFollowedCluster> getAutoFollowedClusters() {
128+
return autoFollowedClusters;
129+
}
130+
131+
public static class AutoFollowedCluster {
132+
133+
private final long timeSinceLastCheckMillis;
134+
private final long lastSeenMetadataVersion;
135+
136+
public AutoFollowedCluster(long timeSinceLastCheckMillis, long lastSeenMetadataVersion) {
137+
this.timeSinceLastCheckMillis = timeSinceLastCheckMillis;
138+
this.lastSeenMetadataVersion = lastSeenMetadataVersion;
139+
}
140+
141+
public long getTimeSinceLastCheckMillis() {
142+
return timeSinceLastCheckMillis;
143+
}
144+
145+
public long getLastSeenMetadataVersion() {
146+
return lastSeenMetadataVersion;
147+
}
148+
}
149+
105150
}

client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/CcrStatsResponseTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.client.ccr;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.client.ccr.AutoFollowStats.AutoFollowedCluster;
2324
import org.elasticsearch.client.ccr.IndicesFollowStats.ShardFollowStats;
2425
import org.elasticsearch.common.collect.Tuple;
2526
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -185,6 +186,19 @@ private static void toXContent(CcrStatsResponse response, XContentBuilder builde
185186
builder.endObject();
186187
}
187188
builder.endArray();
189+
builder.startArray(AutoFollowStats.AUTO_FOLLOWED_CLUSTERS.getPreferredName());
190+
for (Map.Entry<String, AutoFollowedCluster> entry : autoFollowStats.getAutoFollowedClusters().entrySet()) {
191+
builder.startObject();
192+
{
193+
builder.field(AutoFollowStats.CLUSTER_NAME.getPreferredName(), entry.getKey());
194+
builder.field(AutoFollowStats.TIME_SINCE_LAST_CHECK_MILLIS.getPreferredName(),
195+
entry.getValue().getTimeSinceLastCheckMillis());
196+
builder.field(AutoFollowStats.LAST_SEEN_METADATA_VERSION.getPreferredName(),
197+
entry.getValue().getLastSeenMetadataVersion());
198+
}
199+
builder.endObject();
200+
}
201+
builder.endArray();
188202
}
189203
builder.endObject();
190204

@@ -315,11 +329,16 @@ private static AutoFollowStats randomAutoFollowStats() {
315329
for (int i = 0; i < count; i++) {
316330
readExceptions.put("" + i, new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
317331
}
332+
final NavigableMap<String, AutoFollowedCluster> autoFollowClusters = new TreeMap<>();
333+
for (int i = 0; i < count; i++) {
334+
autoFollowClusters.put("" + i, new AutoFollowedCluster(randomLong(), randomNonNegativeLong()));
335+
}
318336
return new AutoFollowStats(
319337
randomNonNegativeLong(),
320338
randomNonNegativeLong(),
321339
randomNonNegativeLong(),
322-
readExceptions
340+
readExceptions,
341+
autoFollowClusters
323342
);
324343
}
325344

docs/reference/ccr/apis/get-ccr-stats.asciidoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ The API returns the following results:
105105
"number_of_failed_follow_indices" : 0,
106106
"number_of_failed_remote_cluster_state_requests" : 0,
107107
"number_of_successful_follow_indices" : 1,
108-
"recent_auto_follow_errors" : []
108+
"recent_auto_follow_errors" : [],
109+
"auto_followed_clusters" : []
109110
},
110111
"follow_stats" : {
111112
"indices" : [
@@ -151,6 +152,7 @@ The API returns the following results:
151152
// TESTRESPONSE[s/"number_of_failed_remote_cluster_state_requests" : 0/"number_of_failed_remote_cluster_state_requests" : $body.auto_follow_stats.number_of_failed_remote_cluster_state_requests/]
152153
// TESTRESPONSE[s/"number_of_successful_follow_indices" : 1/"number_of_successful_follow_indices" : $body.auto_follow_stats.number_of_successful_follow_indices/]
153154
// TESTRESPONSE[s/"recent_auto_follow_errors" : \[\]/"recent_auto_follow_errors" : $body.auto_follow_stats.recent_auto_follow_errors/]
155+
// TESTRESPONSE[s/"auto_followed_clusters" : \[\]/"auto_followed_clusters" : $body.auto_follow_stats.auto_followed_clusters/]
154156
// TESTRESPONSE[s/"leader_global_checkpoint" : 1024/"leader_global_checkpoint" : $body.follow_stats.indices.0.shards.0.leader_global_checkpoint/]
155157
// TESTRESPONSE[s/"leader_max_seq_no" : 1536/"leader_max_seq_no" : $body.follow_stats.indices.0.shards.0.leader_max_seq_no/]
156158
// TESTRESPONSE[s/"follower_global_checkpoint" : 768/"follower_global_checkpoint" : $body.follow_stats.indices.0.shards.0.follower_global_checkpoint/]

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public Collection<Object> createComponents(
156156

157157
return Arrays.asList(
158158
ccrLicenseChecker,
159-
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker)
159+
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
160160
);
161161
}
162162

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,12 @@
5252
import java.util.function.BiConsumer;
5353
import java.util.function.Consumer;
5454
import java.util.function.Function;
55+
import java.util.function.LongSupplier;
5556
import java.util.function.Supplier;
5657
import java.util.stream.Collectors;
5758

59+
import static org.elasticsearch.xpack.core.ccr.AutoFollowStats.AutoFollowedCluster;
60+
5861
/**
5962
* A component that runs only on the elected master node and follows leader indices automatically
6063
* if they match with a auto follow pattern that is defined in {@link AutoFollowMetadata}.
@@ -67,6 +70,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
6770
private final Client client;
6871
private final ClusterService clusterService;
6972
private final CcrLicenseChecker ccrLicenseChecker;
73+
private final LongSupplier relativeMillisTimeProvider;
7074

7175
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();
7276

@@ -79,10 +83,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
7983
public AutoFollowCoordinator(
8084
Client client,
8185
ClusterService clusterService,
82-
CcrLicenseChecker ccrLicenseChecker) {
86+
CcrLicenseChecker ccrLicenseChecker,
87+
LongSupplier relativeMillisTimeProvider) {
88+
8389
this.client = client;
8490
this.clusterService = clusterService;
8591
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker");
92+
this.relativeMillisTimeProvider = relativeMillisTimeProvider;
8693
clusterService.addListener(this);
8794
this.recentAutoFollowErrors = new LinkedHashMap<String, ElasticsearchException>() {
8895
@Override
@@ -93,11 +100,26 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
93100
}
94101

95102
public synchronized AutoFollowStats getStats() {
103+
final Map<String, AutoFollower> autoFollowers = this.autoFollowers;
104+
final TreeMap<String, AutoFollowedCluster> timesSinceLastAutoFollowPerRemoteCluster = new TreeMap<>();
105+
for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
106+
long lastAutoFollowTimeInMillis = entry.getValue().lastAutoFollowTimeInMillis;
107+
long lastSeenMetadataVersion = entry.getValue().metadataVersion;
108+
if (lastAutoFollowTimeInMillis != -1) {
109+
long timeSinceLastCheckInMillis = relativeMillisTimeProvider.getAsLong() - lastAutoFollowTimeInMillis;
110+
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(),
111+
new AutoFollowedCluster(timeSinceLastCheckInMillis, lastSeenMetadataVersion));
112+
} else {
113+
timesSinceLastAutoFollowPerRemoteCluster.put(entry.getKey(), new AutoFollowedCluster(-1L, lastSeenMetadataVersion));
114+
}
115+
}
116+
96117
return new AutoFollowStats(
97118
numberOfFailedIndicesAutoFollowed,
98119
numberOfFailedRemoteClusterStateRequests,
99120
numberOfSuccessfulIndicesAutoFollowed,
100-
new TreeMap<>(recentAutoFollowErrors)
121+
new TreeMap<>(recentAutoFollowErrors),
122+
timesSinceLastAutoFollowPerRemoteCluster
101123
);
102124
}
103125

@@ -146,7 +168,8 @@ void updateAutoFollowers(ClusterState followerClusterState) {
146168

147169
Map<String, AutoFollower> newAutoFollowers = new HashMap<>(newRemoteClusters.size());
148170
for (String remoteCluster : newRemoteClusters) {
149-
AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) {
171+
AutoFollower autoFollower =
172+
new AutoFollower(remoteCluster, this::updateStats, clusterService::state, relativeMillisTimeProvider) {
150173

151174
@Override
152175
void getRemoteClusterState(final String remoteCluster,
@@ -239,20 +262,25 @@ abstract static class AutoFollower {
239262
private final String remoteCluster;
240263
private final Consumer<List<AutoFollowResult>> statsUpdater;
241264
private final Supplier<ClusterState> followerClusterStateSupplier;
265+
private final LongSupplier relativeTimeProvider;
242266

267+
private volatile long lastAutoFollowTimeInMillis = -1;
243268
private volatile long metadataVersion = 0;
244269
private volatile CountDown autoFollowPatternsCountDown;
245270
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
246271

247272
AutoFollower(final String remoteCluster,
248273
final Consumer<List<AutoFollowResult>> statsUpdater,
249-
final Supplier<ClusterState> followerClusterStateSupplier) {
274+
final Supplier<ClusterState> followerClusterStateSupplier,
275+
LongSupplier relativeTimeProvider) {
250276
this.remoteCluster = remoteCluster;
251277
this.statsUpdater = statsUpdater;
252278
this.followerClusterStateSupplier = followerClusterStateSupplier;
279+
this.relativeTimeProvider = relativeTimeProvider;
253280
}
254281

255282
void start() {
283+
lastAutoFollowTimeInMillis = relativeTimeProvider.getAsLong();
256284
final ClusterState clusterState = followerClusterStateSupplier.get();
257285
final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
258286
if (autoFollowMetadata == null) {

0 commit comments

Comments
 (0)