Skip to content

[CCR] Add ccr.auto_follow_coordinator.wait_for_timeout setting #36714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public Collection<Object> createComponents(
ccrLicenseChecker,
restoreSourceService,
new CcrRepositoryManager(settings, clusterService, client),
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
new AutoFollowCoordinator(settings, client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.XPackSettings;

import java.util.Arrays;
Expand All @@ -28,6 +29,12 @@ private CcrSettings() {
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);

/**
* The settings defined by CCR.
*
Expand All @@ -36,7 +43,8 @@ private CcrSettings() {
static List<Setting<?>> getSettings() {
return Arrays.asList(
XPackSettings.CCR_ENABLED_SETTING,
CCR_FOLLOWING_INDEX_SETTING);
CCR_FOLLOWING_INDEX_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.elasticsearch.common.collect.CopyOnWriteHashMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final CcrLicenseChecker ccrLicenseChecker;
private final LongSupplier relativeMillisTimeProvider;

private volatile TimeValue waitForMetadataTimeOut;
private volatile Map<String, AutoFollower> autoFollowers = Collections.emptyMap();

// The following fields are read and updated under a lock:
Expand All @@ -81,6 +84,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private final LinkedHashMap<String, ElasticsearchException> recentAutoFollowErrors;

public AutoFollowCoordinator(
Settings settings,
Client client,
ClusterService clusterService,
CcrLicenseChecker ccrLicenseChecker,
Expand All @@ -97,6 +101,15 @@ protected boolean removeEldestEntry(final Map.Entry<String, ElasticsearchExcepti
return size() > MAX_AUTO_FOLLOW_ERRORS;
}
};

Consumer<TimeValue> updater = newWaitForTimeOut -> {
if (newWaitForTimeOut.equals(waitForMetadataTimeOut) == false) {
LOGGER.info("changing wait_for_metadata_timeout from [{}] to [{}]", waitForMetadataTimeOut, newWaitForTimeOut);
waitForMetadataTimeOut = newWaitForTimeOut;
}
};
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater);
waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings);
}

public synchronized AutoFollowStats getStats() {
Expand Down Expand Up @@ -180,6 +193,7 @@ void getRemoteClusterState(final String remoteCluster,
request.metaData(true);
request.routingTable(true);
request.waitForMetaDataVersion(metadataVersion);
request.waitForTimeout(waitForMetadataTimeOut);
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngine;
import org.elasticsearch.xpack.core.XPackSettings;
Expand Down Expand Up @@ -199,6 +200,8 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
Expand Down Expand Up @@ -42,6 +43,8 @@ protected Settings nodeSettings() {
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
Expand Down Expand Up @@ -530,8 +532,9 @@ public void testGetFollowerIndexName() {

public void testStats() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);

Expand Down Expand Up @@ -586,14 +589,15 @@ public void testStats() {
}

public void testUpdateAutoFollowers() {
ClusterService clusterService = mock(ClusterService.class);
ClusterService clusterService = mockClusterService();
// Return a cluster state with no patterns so that the auto followers never really execute:
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())))
.build();
when(clusterService.state()).thenReturn(followerState);
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
clusterService,
new CcrLicenseChecker(() -> true, () -> false),
Expand Down Expand Up @@ -648,8 +652,9 @@ public void testUpdateAutoFollowers() {

public void testUpdateAutoFollowersNoPatterns() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
Expand All @@ -662,8 +667,9 @@ public void testUpdateAutoFollowersNoPatterns() {

public void testUpdateAutoFollowersNoAutoFollowMetadata() {
AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator(
Settings.EMPTY,
null,
mock(ClusterService.class),
mockClusterService(),
new CcrLicenseChecker(() -> true, () -> false),
() -> 1L);
ClusterState clusterState = ClusterState.builder(new ClusterName("remote")).build();
Expand Down Expand Up @@ -840,4 +846,12 @@ private static Supplier<ClusterState> localClusterStateSupplier(ClusterState...
};
}

private ClusterService mockClusterService() {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings =
new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT));
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
return clusterService;
}

}