Skip to content

Allow follower indices to override leader settings #58103

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 18, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
Expand All @@ -32,6 +33,7 @@

public class FollowConfig {

static final ParseField SETTINGS = new ParseField("settings");
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");
static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size");
static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests");
Expand All @@ -49,6 +51,7 @@ public class FollowConfig {
FollowConfig::new);

static {
PARSER.declareObject(FollowConfig::setSettings, (p, c) -> Settings.fromXContent(p), SETTINGS);
PARSER.declareInt(FollowConfig::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareInt(FollowConfig::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareField(
Expand Down Expand Up @@ -81,6 +84,7 @@ static FollowConfig fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private Settings settings = Settings.EMPTY;
private Integer maxReadRequestOperationCount;
private Integer maxOutstandingReadRequests;
private ByteSizeValue maxReadRequestSize;
Expand All @@ -95,6 +99,14 @@ static FollowConfig fromXContent(XContentParser parser) {
FollowConfig() {
}

public Settings getSettings() {
return settings;
}

public void setSettings(final Settings settings) {
this.settings = Objects.requireNonNull(settings);
}

public Integer getMaxReadRequestOperationCount() {
return maxReadRequestOperationCount;
}
Expand Down Expand Up @@ -176,6 +188,13 @@ public void setReadPollTimeout(TimeValue readPollTimeout) {
}

void toXContentFragment(XContentBuilder builder, ToXContent.Params params) throws IOException {
if (settings.isEmpty() == false) {
builder.startObject(SETTINGS.getPreferredName());
{
settings.toXContent(builder, params);
}
builder.endObject();
}
if (maxReadRequestOperationCount != null) {
builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
Expand Down Expand Up @@ -98,6 +99,7 @@ public static class Pattern extends FollowConfig {
PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.REMOTE_CLUSTER_FIELD);
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), PutAutoFollowPatternRequest.LEADER_PATTERNS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), PutAutoFollowPatternRequest.FOLLOW_PATTERN_FIELD);
PARSER.declareObject(Pattern::setSettings, (p, c) -> Settings.fromXContent(p), PutAutoFollowPatternRequest.SETTINGS);
PARSER.declareInt(Pattern::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareField(
Pattern::setMaxReadRequestSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -48,6 +50,8 @@
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.test.rest.yaml.ObjectPath;
Expand All @@ -61,6 +65,7 @@

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
Expand All @@ -80,6 +85,7 @@ public void testIndexFollowing() throws Exception {
assertThat(response.isAcknowledged(), is(true));

PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
putFollowRequest.setSettings(Settings.builder().put("index.number_of_replicas", 0L).build());
PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
assertThat(putFollowResponse.isFollowIndexCreated(), is(true));
assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true));
Expand Down Expand Up @@ -118,6 +124,13 @@ public void testIndexFollowing() throws Exception {
SearchRequest followerSearchRequest = new SearchRequest("follower");
SearchResponse followerSearchResponse = highLevelClient().search(followerSearchRequest, RequestOptions.DEFAULT);
assertThat(followerSearchResponse.getHits().getTotalHits().value, equalTo(1L));

GetSettingsRequest followerSettingsRequest = new GetSettingsRequest().indices("follower");
GetSettingsResponse followerSettingsResponse =
highLevelClient().indices().getSettings(followerSettingsRequest, RequestOptions.DEFAULT);
assertThat(
IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(followerSettingsResponse.getIndexToSettings().get("follower")),
equalTo(0));
});
} catch (Exception e) {
IndicesFollowStats followStats = ccrClient.getCcrStats(new CcrStatsRequest(), RequestOptions.DEFAULT).getIndicesFollowStats();
Expand Down Expand Up @@ -245,6 +258,10 @@ public void testAutoFollowing() throws Exception {
PutAutoFollowPatternRequest putAutoFollowPatternRequest =
new PutAutoFollowPatternRequest("pattern1", "local_cluster", Collections.singletonList("logs-*"));
putAutoFollowPatternRequest.setFollowIndexNamePattern("copy-{{leader_index}}");
final int followerNumberOfReplicas = randomIntBetween(0, 4);
final Settings autoFollowerPatternSettings =
Settings.builder().put("index.number_of_replicas", followerNumberOfReplicas).build();
putAutoFollowPatternRequest.setSettings(autoFollowerPatternSettings);
AcknowledgedResponse putAutoFollowPatternResponse =
execute(putAutoFollowPatternRequest, ccrClient::putAutoFollowPattern, ccrClient::putAutoFollowPatternAsync);
assertThat(putAutoFollowPatternResponse.isAcknowledged(), is(true));
Expand All @@ -260,6 +277,9 @@ public void testAutoFollowing() throws Exception {
assertThat(ccrStatsResponse.getIndicesFollowStats().getShardFollowStats("copy-logs-20200101"), notNullValue());
});
assertThat(indexExists("copy-logs-20200101"), is(true));
assertThat(
getIndexSettingsAsMap("copy-logs-20200101"),
hasEntry("index.number_of_replicas", Integer.toString(followerNumberOfReplicas)));

GetAutoFollowPatternRequest getAutoFollowPatternRequest =
randomBoolean() ? new GetAutoFollowPatternRequest("pattern1") : new GetAutoFollowPatternRequest();
Expand All @@ -271,6 +291,7 @@ public void testAutoFollowing() throws Exception {
assertThat(pattern.getRemoteCluster(), equalTo(putAutoFollowPatternRequest.getRemoteCluster()));
assertThat(pattern.getLeaderIndexPatterns(), equalTo(putAutoFollowPatternRequest.getLeaderIndexPatterns()));
assertThat(pattern.getFollowIndexNamePattern(), equalTo(putAutoFollowPatternRequest.getFollowIndexNamePattern()));
assertThat(pattern.getSettings(), equalTo(autoFollowerPatternSettings));

// Cleanup:
final DeleteAutoFollowPatternRequest deleteAutoFollowPatternRequest = new DeleteAutoFollowPatternRequest("pattern1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.client.ccr;

import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -47,8 +49,10 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
NavigableMap<String, AutoFollowMetadata.AutoFollowPattern> patterns = new TreeMap<>();
for (int i = 0; i < numPatterns; i++) {
String remoteCluster = randomAlphaOfLength(4);
List<String> leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4));
List<String> leaderIndexPatterns = Collections.singletonList(randomAlphaOfLength(4));
String followIndexNamePattern = randomAlphaOfLength(4);
final Settings settings =
Settings.builder().put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), randomIntBetween(0, 4)).build();
boolean active = randomBoolean();

Integer maxOutstandingReadRequests = null;
Expand Down Expand Up @@ -91,10 +95,26 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
if (randomBoolean()) {
readPollTimeout = new TimeValue(randomNonNegativeLong());
}
patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters,
followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize,
maxRetryDelay, readPollTimeout));
patterns.put(
randomAlphaOfLength(4),
new AutoFollowMetadata.AutoFollowPattern(
remoteCluster,
leaderIndexPatterns,
followIndexNamePattern,
settings,
active,
maxReadRequestOperationCount,
maxWriteRequestOperationCount,
maxOutstandingReadRequests,
maxOutstandingWriteRequests,
maxReadRequestSize,
maxWriteRequestSize,
maxWriteBufferCount,
maxWriteBufferSize,
maxRetryDelay,
readPollTimeout
)
);
}
return new GetAutoFollowPatternAction.Response(patterns);
}
Expand All @@ -115,6 +135,7 @@ protected void assertInstances(GetAutoFollowPatternAction.Response serverTestIns
assertThat(serverPattern.getRemoteCluster(), equalTo(clientPattern.getRemoteCluster()));
assertThat(serverPattern.getLeaderIndexPatterns(), equalTo(clientPattern.getLeaderIndexPatterns()));
assertThat(serverPattern.getFollowIndexPattern(), equalTo(clientPattern.getFollowIndexNamePattern()));
assertThat(serverPattern.getSettings(), equalTo(clientPattern.getSettings()));
assertThat(serverPattern.getMaxOutstandingReadRequests(), equalTo(clientPattern.getMaxOutstandingReadRequests()));
assertThat(serverPattern.getMaxOutstandingWriteRequests(), equalTo(clientPattern.getMaxOutstandingWriteRequests()));
assertThat(serverPattern.getMaxReadRequestOperationCount(), equalTo(clientPattern.getMaxReadRequestOperationCount()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before;

Expand Down Expand Up @@ -91,6 +92,9 @@ public void testPutFollow() throws Exception {
"follower", // <3>
ActiveShardCount.ONE // <4>
);
Settings settings =
Settings.builder().put("index.number_of_replicas", 0L).build();
putFollowRequest.setSettings(settings); // <5>
// end::ccr-put-follow-request

// tag::ccr-put-follow-execute
Expand Down Expand Up @@ -484,6 +488,9 @@ public void testPutAutoFollowPattern() throws Exception {
Arrays.asList("logs-*", "metrics-*") // <3>
);
request.setFollowIndexNamePattern("copy-{{leader_index}}"); // <4>
Settings settings =
Settings.builder().put("index.number_of_replicas", 0L).build();
request.setSettings(settings); // <5>
// end::ccr-put-auto-follow-pattern-request

// tag::ccr-put-auto-follow-pattern-execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include-tagged::{doc-tests-file}[{api}-request]
<2> The name of the remote cluster.
<3> The leader index patterns.
<4> The pattern used to create the follower index
<5> The settings overrides for the follower index

[id="{upid}-{api}-response"]
==== Response
Expand Down
1 change: 1 addition & 0 deletions docs/java-rest/high-level/ccr/put_follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ include-tagged::{doc-tests-file}[{api}-request]
<3> The name of the follower index that gets created as part of the put follow API call.
<4> The number of active shard copies to wait for before the put follow API returns a
response, as an `ActiveShardCount`
<5> The settings overrides for the follower index.

[id="{upid}-{api}-response"]
==== Response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
"leader_index*"
],
"follow_index_pattern" : "{{leader_index}}-follower",
"settings": {
"index.number_of_replicas": 0
},
"max_read_request_operation_count" : 1024,
"max_outstanding_read_requests" : 16,
"max_read_request_size" : "1024k",
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/ccr/apis/follow-request-body.asciidoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
[testenv="platinum"]
`settings`::
(object) Settings to override from the leader index. Note that certain
settings can not be overrode (e.g., `index.number_of_shards`).

`max_read_request_operation_count`::
(integer) The maximum number of operations to pull per read from the remote
cluster.
Expand Down
3 changes: 3 additions & 0 deletions docs/reference/ccr/apis/follow/put-follow.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ PUT /follower_index/_ccr/follow?wait_for_active_shards=1
{
"remote_cluster" : "remote_cluster",
"leader_index" : "leader_index",
"settings": {
"index.number_of_replicas": 0
},
"max_read_request_operation_count" : 1024,
"max_outstanding_read_requests" : 16,
"max_read_request_size" : "1024k",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@
package org.elasticsearch.xpack.ccr;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.instanceOf;

public class AutoFollowIT extends ESCCRRestTestCase {

Expand Down Expand Up @@ -66,7 +74,27 @@ public void testAutoFollowPatterns() throws Exception {

int initialNumberOfSuccessfulFollowedIndices = getNumberOfSuccessfulFollowedIndices();
Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
request.setJsonEntity("{\"leader_index_patterns\": [\"metrics-*\"], \"remote_cluster\": \"leader_cluster\"}");
final boolean overrideNumberOfReplicas = randomBoolean();
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("metrics-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
if (overrideNumberOfReplicas) {
bodyBuilder.startObject("settings");
{
bodyBuilder.field("index.number_of_replicas", 0);
}
bodyBuilder.endObject();
}
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
assertOK(client().performRequest(request));

try (RestClient leaderClient = buildLeaderClient()) {
Expand All @@ -84,13 +112,57 @@ public void testAutoFollowPatterns() throws Exception {
assertThat(getNumberOfSuccessfulFollowedIndices(), equalTo(initialNumberOfSuccessfulFollowedIndices + 1));
ensureYellow("metrics-20210101");
verifyDocuments("metrics-20210101", 5, "filtered_field:true");
if (overrideNumberOfReplicas) {
assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "0"));
} else {
assertThat(getIndexSettingsAsMap("metrics-20210101"), hasEntry("index.number_of_replicas", "1"));
}
});
assertBusy(() -> {
verifyCcrMonitoring("metrics-20210101", "metrics-20210101");
verifyAutoFollowMonitoring();
}, 30, TimeUnit.SECONDS);
}

public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
return;
}

final Request request = new Request("PUT", "/_ccr/auto_follow/test_pattern");
try (XContentBuilder bodyBuilder = JsonXContent.contentBuilder()) {
bodyBuilder.startObject();
{
bodyBuilder.startArray("leader_index_patterns");
{
bodyBuilder.value("metrics-*");
}
bodyBuilder.endArray();
bodyBuilder.field("remote_cluster", "leader_cluster");
bodyBuilder.startObject("settings");
{
bodyBuilder.field("index.number_of_shards", 5);
}
bodyBuilder.endObject();
}
bodyBuilder.endObject();
request.setJsonEntity(Strings.toString(bodyBuilder));
}
final ResponseException responseException = expectThrows(ResponseException.class, () -> client().performRequest(request));
final Response response = responseException.getResponse();
assertThat(response.getStatusLine().getStatusCode(), equalTo(400));
final Map<String, Object> responseAsMap = entityAsMap(response);
assertThat(responseAsMap, hasKey("error"));
assertThat(responseAsMap.get("error"), instanceOf(Map.class));
@SuppressWarnings("unchecked") final Map<Object, Object> error = (Map<Object, Object>) responseAsMap.get("error");
assertThat(error, hasEntry("type", "illegal_argument_exception"));
assertThat(
error,
hasEntry("reason", "can not put auto-follow pattern that could override leader settings {\"index.number_of_shards\":\"5\"}")
);
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
Expand Down
Loading