Skip to content

Commit f501c10

Browse files
committed
split up handleClusterAlias() method
1 parent c602caf commit f501c10

File tree

1 file changed

+50
-65
lines changed

1 file changed

+50
-65
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 50 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,19 @@ void autoFollowIndices() {
252252
final int slot = i;
253253
final String clusterAlias = entry.getKey();
254254
final AutoFollowPattern autoFollowPattern = entry.getValue();
255-
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
256255

257256
getLeaderClusterState(autoFollowPattern.getHeaders(), clusterAlias, (leaderClusterState, e) -> {
258257
if (leaderClusterState != null) {
259258
assert e == null;
260-
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
261-
handleClusterAlias(clusterAlias, autoFollowPattern, followedIndices, leaderClusterState, resultHandler);
259+
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
260+
final List<Index> leaderIndicesToFollow =
261+
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndices);
262+
if (leaderIndicesToFollow.isEmpty()) {
263+
finalise(slot, new AutoFollowResult(clusterAlias));
264+
}else {
265+
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
266+
checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, resultHandler);
267+
}
262268
} else {
263269
finalise(slot, new AutoFollowResult(clusterAlias, e));
264270
}
@@ -267,71 +273,50 @@ void autoFollowIndices() {
267273
}
268274
}
269275

270-
private void handleClusterAlias(
271-
String clusterAlias,
272-
AutoFollowPattern autoFollowPattern,
273-
List<String> followedIndexUUIDs,
274-
ClusterState leaderClusterState,
275-
Consumer<AutoFollowResult> resultHandler
276-
) {
277-
final List<Index> leaderIndicesToFollow =
278-
getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, followerClusterState, followedIndexUUIDs);
279-
if (leaderIndicesToFollow.isEmpty()) {
280-
resultHandler.accept(new AutoFollowResult(clusterAlias));
281-
} else {
282-
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
283-
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
284-
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
285-
final int slot = i;
286-
final Index indexToFollow = leaderIndicesToFollow.get(i);
287-
final String leaderIndexName = indexToFollow.getName();
288-
final String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndexName);
289-
290-
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
291-
clusterAlias + ":" + leaderIndexName;
292-
FollowIndexAction.Request followRequest =
293-
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
294-
autoFollowPattern.getMaxBatchOperationCount(), autoFollowPattern.getMaxConcurrentReadBatches(),
295-
autoFollowPattern.getMaxOperationSizeInBytes(), autoFollowPattern.getMaxConcurrentWriteBatches(),
296-
autoFollowPattern.getMaxWriteBufferSize(), autoFollowPattern.getMaxRetryDelay(),
297-
autoFollowPattern.getIdleShardRetryDelay());
298-
299-
// Execute if the create and follow api call succeeds:
300-
Runnable successHandler = () -> {
301-
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);
302-
303-
// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
304-
// (so that we do not try to follow it in subsequent auto follow runs)
305-
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
306-
// The coordinator always runs on the elected master node, so we can update cluster state here:
307-
updateAutoFollowMetadata(function, updateError -> {
308-
assert results.get(slot) == null;
309-
if (updateError != null) {
310-
LOGGER.error("Failed to mark leader index [" + leaderIndexName + "] as auto followed", updateError);
311-
results.set(slot, new Tuple<>(indexToFollow, updateError));
312-
} else {
313-
results.set(slot, new Tuple<>(indexToFollow, null));
314-
LOGGER.debug("Successfully marked leader index [{}] as auto followed", leaderIndexName);
315-
}
316-
if (leaderIndicesCountDown.countDown()) {
317-
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
318-
}
319-
});
320-
};
321-
// Execute if the create and follow apu call fails:
322-
Consumer<Exception> failureHandler = followError -> {
323-
assert followError != null;
324-
LOGGER.warn("Failed to auto follow leader index [" + leaderIndexName + "]", followError);
325-
results.set(slot, new Tuple<>(indexToFollow, followError));
326-
if (leaderIndicesCountDown.countDown()) {
327-
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
328-
}
329-
};
330-
createAndFollow(autoFollowPattern.getHeaders(), followRequest, successHandler, failureHandler);
331-
}
276+
private void checkAutoFollowPattern(String clusterAlias, AutoFollowPattern autoFollowPattern,
277+
List<Index> leaderIndicesToFollow, Consumer<AutoFollowResult> resultHandler) {
278+
279+
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
280+
final AtomicArray<Tuple<Index, Exception>> results = new AtomicArray<>(leaderIndicesToFollow.size());
281+
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
282+
final Index indexToFollow = leaderIndicesToFollow.get(i);
283+
final int slot = i;
284+
followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, error -> {
285+
results.set(slot, new Tuple<>(indexToFollow, error));
286+
if (leaderIndicesCountDown.countDown()) {
287+
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
288+
}
289+
});
332290
}
333291
}
334292

293+
private void followLeaderIndex(String clusterAlias, Index indexToFollow,
294+
AutoFollowPattern pattern, Consumer<Exception> onResult) {
295+
final String leaderIndexName = indexToFollow.getName();
296+
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
297+
298+
String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName :
299+
clusterAlias + ":" + leaderIndexName;
300+
FollowIndexAction.Request request =
301+
new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName,
302+
pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(),
303+
pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(),
304+
pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(),
305+
pattern.getIdleShardRetryDelay());
306+
307+
// Execute if the create and follow api call succeeds:
308+
Runnable successHandler = () -> {
309+
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);
310+
311+
// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
312+
// (so that we do not try to follow it in subsequent auto follow runs)
313+
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
314+
// The coordinator always runs on the elected master node, so we can update cluster state here:
315+
updateAutoFollowMetadata(function, onResult);
316+
};
317+
createAndFollow(pattern.getHeaders(), request, successHandler, onResult);
318+
}
319+
335320
private void finalise(int slot, AutoFollowResult result) {
336321
assert autoFollowResults.get(slot) == null;
337322
autoFollowResults.set(slot, result);

0 commit comments

Comments
 (0)