Skip to content

Commit 2a4bac1

Browse files
committed
address review comments
1 parent 11e485d commit 2a4bac1

File tree

13 files changed

+82
-76
lines changed

13 files changed

+82
-76
lines changed

core/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ public void readFrom(StreamInput in) throws IOException {
360360
shardFailures[i] = readShardSearchFailure(in);
361361
}
362362
}
363-
if (!in.getVersion().before(Version.V_6_1_0)) {
363+
//TODO update version once backported
364+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
364365
clusters = new Clusters(in);
365366
}
366367
scrollId = in.readOptionalString();
@@ -381,7 +382,8 @@ public void writeTo(StreamOutput out) throws IOException {
381382
for (ShardSearchFailure shardSearchFailure : shardFailures) {
382383
shardSearchFailure.writeTo(out);
383384
}
384-
if (!out.getVersion().before(Version.V_6_1_0)) {
385+
//TODO update version once backported
386+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
385387
clusters.writeTo(out);
386388
}
387389
out.writeOptionalString(scrollId);
@@ -414,11 +416,13 @@ public static class Clusters implements ToXContent, Writeable {
414416
private final int skipped;
415417

416418
Clusters(int total, int successful, int skipped) {
419+
assert total >= 0 && successful >= 0 && skipped >= 0
420+
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
421+
assert successful <= total && skipped == total - successful
422+
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
417423
this.total = total;
418424
this.successful = successful;
419425
this.skipped = skipped;
420-
assert successful <= total && skipped == total - successful
421-
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
422426
}
423427

424428
private Clusters(StreamInput in) throws IOException {

core/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ public void apply(Settings value, Settings current, Settings previous) {
262262
SearchService.DEFAULT_SEARCH_TIMEOUT_SETTING,
263263
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
264264
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
265-
RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED,
265+
RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
266266
RemoteClusterAware.REMOTE_CLUSTERS_SEEDS,
267267
RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER,
268268
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING,

core/src/main/java/org/elasticsearch/transport/RemoteClusterAware.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public abstract class RemoteClusterAware extends AbstractComponent {
5656
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
5757
public static final String LOCAL_CLUSTER_GROUP_KEY = "";
5858

59-
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_IF_DISCONNECTED =
60-
Setting.affixKeySetting("search.remote.", "skip_if_disconnected",
59+
public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_UNAVAILABLE =
60+
Setting.affixKeySetting("search.remote.", "skip_unavailable",
6161
key -> boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic));
6262

6363
protected final ClusterNameExpressionResolver clusterNameResolver;

core/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
8888
private final int maxNumRemoteConnections;
8989
private final Predicate<DiscoveryNode> nodePredicate;
9090
private volatile List<DiscoveryNode> seedNodes;
91-
private volatile boolean skipIfDisconnected;
91+
private volatile boolean skipUnavailable;
9292
private final ConnectHandler connectHandler;
9393
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
9494

@@ -119,7 +119,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
119119
remoteProfile = builder.build();
120120
connectedNodes = new ConnectedNodes(clusterAlias);
121121
this.seedNodes = Collections.unmodifiableList(seedNodes);
122-
this.skipIfDisconnected = RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED
122+
this.skipUnavailable = RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE
123123
.getConcreteSettingForNamespace(clusterAlias).get(settings);
124124
this.connectHandler = new ConnectHandler();
125125
transportService.addConnectionListener(this);
@@ -134,10 +134,10 @@ synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<
134134
}
135135

136136
/**
137-
* Updates the skipIfDisconnected flag that can be dynamically set for each remote cluster
137+
* Updates the skipUnavailable flag that can be dynamically set for each remote cluster
138138
*/
139-
synchronized void updateSkipIfDisconnected(boolean skipIfDisconnected) {
140-
this.skipIfDisconnected = skipIfDisconnected;
139+
void updateSkipUnavailable(boolean skipUnavailable) {
140+
this.skipUnavailable = skipUnavailable;
141141
}
142142

143143
@Override
@@ -157,15 +157,15 @@ public void fetchSearchShards(ClusterSearchShardsRequest searchRequest,
157157

158158
final ActionListener<ClusterSearchShardsResponse> searchShardsListener;
159159
final Consumer<Exception> onConnectFailure;
160-
if (skipIfDisconnected) {
160+
if (skipUnavailable) {
161161
onConnectFailure = (exception) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY);
162162
searchShardsListener = ActionListener.wrap(listener::onResponse, (e) -> listener.onResponse(ClusterSearchShardsResponse.EMPTY));
163163
} else {
164164
onConnectFailure = listener::onFailure;
165165
searchShardsListener = listener;
166166
}
167167
// in case we have no connected nodes we try to connect and if we fail we either notify the listener or not depending on
168-
// the skip_if_disconnected setting
168+
// the skip_unavailable setting
169169
ensureConnected(ActionListener.wrap((x) -> fetchShardsInternal(searchRequest, searchShardsListener), onConnectFailure));
170170
}
171171

@@ -610,7 +610,7 @@ public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
610610
// not connected we return immediately
611611
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
612612
Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
613-
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipIfDisconnected);
613+
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
614614
listener.onResponse(remoteConnectionStats);
615615
} else {
616616
NodesInfoRequest request = new NodesInfoRequest();
@@ -646,7 +646,7 @@ public void handleResponse(NodesInfoResponse response) {
646646
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
647647
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
648648
maxNumRemoteConnections, connectedNodes.size(),
649-
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipIfDisconnected);
649+
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
650650
listener.onResponse(remoteConnectionInfo);
651651
}
652652

core/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -287,20 +287,20 @@ protected Set<String> getRemoteClusterNames() {
287287
@Override
288288
public void listenForUpdates(ClusterSettings clusterSettings) {
289289
super.listenForUpdates(clusterSettings);
290-
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED, this::updateSkipIfDisconnected,
290+
clusterSettings.addAffixUpdateConsumer(RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable,
291291
(clusterAlias, value) -> {});
292292
}
293293

294-
synchronized void updateSkipIfDisconnected(String clusterAlias, Boolean skipIfDisconnected) {
294+
synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
295295
RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
296296
//assert remote != null : "remote cluster [" + clusterAlias + "] not registered";
297-
//this happens if skip_if_disconnected is provided for a remote cluster that is not registered.
298-
//for instance if you set seeds to null in the same call where you set skip_if_disconnected to null, the latter will not find
297+
//this happens if skip_unavaialble is provided for a remote cluster that is not registered.
298+
//for instance if you set seeds to null in the same call where you set skip_unavailable to null, the latter will not find
299299
//the remote cluster.
300300
//TODO it should be possible to remove the if and uncomment the assertion above once we accept
301-
//the skip_if_disconnected setting only for registered remote clusters.
301+
//the skip_unavailable setting only for registered remote clusters.
302302
if (remote != null) {
303-
remote.updateSkipIfDisconnected(skipIfDisconnected);
303+
remote.updateSkipUnavailable(skipUnavailable);
304304
}
305305
}
306306

core/src/main/java/org/elasticsearch/transport/RemoteConnectionInfo.java

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,19 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
4242
final TimeValue initialConnectionTimeout;
4343
final int numNodesConnected;
4444
final String clusterAlias;
45-
final boolean skipIfDisconnected;
45+
final boolean skipUnavailable;
4646

4747
RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
4848
List<TransportAddress> httpAddresses,
4949
int connectionsPerCluster, int numNodesConnected,
50-
TimeValue initialConnectionTimeout, boolean skipIfDisconnected) {
50+
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
5151
this.clusterAlias = clusterAlias;
5252
this.seedNodes = seedNodes;
5353
this.httpAddresses = httpAddresses;
5454
this.connectionsPerCluster = connectionsPerCluster;
5555
this.numNodesConnected = numNodesConnected;
5656
this.initialConnectionTimeout = initialConnectionTimeout;
57-
this.skipIfDisconnected = skipIfDisconnected;
57+
this.skipUnavailable = skipUnavailable;
5858
}
5959

6060
public RemoteConnectionInfo(StreamInput input) throws IOException {
@@ -64,10 +64,11 @@ public RemoteConnectionInfo(StreamInput input) throws IOException {
6464
initialConnectionTimeout = new TimeValue(input);
6565
numNodesConnected = input.readVInt();
6666
clusterAlias = input.readString();
67-
if (input.getVersion().before(Version.V_6_1_0)) {
68-
skipIfDisconnected = false;
67+
//TODO update version once backported
68+
if (input.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
69+
skipUnavailable = false;
6970
} else {
70-
skipIfDisconnected = input.readBoolean();
71+
skipUnavailable = input.readBoolean();
7172
}
7273
}
7374

@@ -89,7 +90,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
8990
builder.field("num_nodes_connected", numNodesConnected);
9091
builder.field("max_connections_per_cluster", connectionsPerCluster);
9192
builder.field("initial_connect_timeout", initialConnectionTimeout);
92-
builder.field("skip_if_disconnected", skipIfDisconnected);
93+
builder.field("skip_unavailable", skipUnavailable);
9394
}
9495
builder.endObject();
9596
return builder;
@@ -103,8 +104,9 @@ public void writeTo(StreamOutput out) throws IOException {
103104
initialConnectionTimeout.writeTo(out);
104105
out.writeVInt(numNodesConnected);
105106
out.writeString(clusterAlias);
106-
if (!out.getVersion().before(Version.V_6_1_0)) {
107-
out.writeBoolean(skipIfDisconnected);
107+
//TODO update version once backported
108+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
109+
out.writeBoolean(skipUnavailable);
108110
}
109111
}
110112

@@ -119,12 +121,12 @@ public boolean equals(Object o) {
119121
Objects.equals(httpAddresses, that.httpAddresses) &&
120122
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
121123
Objects.equals(clusterAlias, that.clusterAlias) &&
122-
skipIfDisconnected == that.skipIfDisconnected;
124+
skipUnavailable == that.skipUnavailable;
123125
}
124126

125127
@Override
126128
public int hashCode() {
127129
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout,
128-
numNodesConnected, clusterAlias, skipIfDisconnected);
130+
numNodesConnected, clusterAlias, skipUnavailable);
129131
}
130132
}

core/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ public void testFetchShards() throws Exception {
376376
updateSeedNodes(connection, nodes);
377377
}
378378
if (randomBoolean()) {
379-
connection.updateSkipIfDisconnected(randomBoolean());
379+
connection.updateSkipUnavailable(randomBoolean());
380380
}
381381
SearchRequest request = new SearchRequest("test-index");
382382
CountDownLatch responseLatch = new CountDownLatch(1);
@@ -398,7 +398,7 @@ public void testFetchShards() throws Exception {
398398
}
399399
}
400400

401-
public void testFetchShardsSkipIfDisconnected() throws Exception {
401+
public void testFetchShardsSkipUnavailable() throws Exception {
402402
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
403403
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT)) {
404404
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
@@ -440,7 +440,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
440440
service.addFailToSendNoConnectRule(seedTransport);
441441

442442
if (randomBoolean()) {
443-
connection.updateSkipIfDisconnected(false);
443+
connection.updateSkipUnavailable(false);
444444
}
445445
{
446446
CountDownLatch responseLatch = new CountDownLatch(1);
@@ -454,7 +454,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
454454
assertThat(failReference.get(), instanceOf(TransportException.class));
455455
}
456456

457-
connection.updateSkipIfDisconnected(true);
457+
connection.updateSkipUnavailable(true);
458458
{
459459
CountDownLatch responseLatch = new CountDownLatch(1);
460460
AtomicReference<ClusterSearchShardsResponse> reference = new AtomicReference<>();
@@ -473,7 +473,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
473473
assertTrue(disconnectedLatch.await(1, TimeUnit.SECONDS));
474474

475475
if (randomBoolean()) {
476-
connection.updateSkipIfDisconnected(false);
476+
connection.updateSkipUnavailable(false);
477477
}
478478

479479
service.clearAllRules();
@@ -815,7 +815,7 @@ public void testRenderConnectionInfoXContent() throws IOException {
815815
builder.endObject();
816816
assertEquals("{\"test_cluster\":{\"seeds\":[\"0.0.0.0:1\"],\"http_addresses\":[\"0.0.0.0:80\"],\"connected\":true," +
817817
"\"num_nodes_connected\":3,\"max_connections_per_cluster\":4,\"initial_connect_timeout\":\"30m\"," +
818-
"\"skip_if_disconnected\":true}}", builder.string());
818+
"\"skip_unavailable\":true}}", builder.string());
819819

820820
stats = new RemoteConnectionInfo("some_other_cluster",
821821
Arrays.asList(new TransportAddress(TransportAddress.META_ADDRESS,1), new TransportAddress(TransportAddress.META_ADDRESS,2)),
@@ -828,7 +828,7 @@ public void testRenderConnectionInfoXContent() throws IOException {
828828
builder.endObject();
829829
assertEquals("{\"some_other_cluster\":{\"seeds\":[\"0.0.0.0:1\",\"0.0.0.0:2\"],\"http_addresses\":[\"0.0.0.0:80\",\"0.0.0.0:81\"],"
830830
+ "\"connected\":false,\"num_nodes_connected\":0,\"max_connections_per_cluster\":2,\"initial_connect_timeout\":\"30s\"," +
831-
"\"skip_if_disconnected\":false}}", builder.string());
831+
"\"skip_unavailable\":false}}", builder.string());
832832
}
833833

834834
private RemoteConnectionInfo getRemoteConnectionInfo(RemoteClusterConnection connection) throws Exception {

core/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private MockTransportService startTransport(
8181

8282
public void testSettingsAreRegistered() {
8383
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTERS_SEEDS));
84-
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED));
84+
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
8585
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CONNECTIONS_PER_CLUSTER));
8686
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
8787
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_NODE_ATTRIBUTE));
@@ -524,9 +524,9 @@ public void onNodeDisconnected(DiscoveryNode node) {
524524
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
525525
}
526526

527-
//setting skip_if_disconnected to true for all the disconnected clusters will make the request succeed again
527+
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again
528528
for (int i : disconnectedNodesIndices) {
529-
remoteClusterService.updateSkipIfDisconnected("remote" + i, true);
529+
remoteClusterService.updateSkipUnavailable("remote" + i, true);
530530
}
531531
{
532532
final CountDownLatch latch = new CountDownLatch(1);
@@ -559,7 +559,7 @@ public void onNodeDisconnected(DiscoveryNode node) {
559559
if (randomBoolean()) {
560560
for (int i : disconnectedNodesIndices) {
561561
if (randomBoolean()) {
562-
remoteClusterService.updateSkipIfDisconnected("remote" + i, true);
562+
remoteClusterService.updateSkipUnavailable("remote" + i, true);
563563
}
564564

565565
}
@@ -591,17 +591,17 @@ public void onNodeDisconnected(DiscoveryNode node) {
591591
}
592592
}
593593

594-
public void testRemoteClusterSkipIfDisconnectedSetting() {
595-
//TODO this should be changed so that skip_if_disconnected can only be set for registered clusters with at least one seed
594+
public void testRemoteClusterSkipUnavailableSetting() {
595+
//TODO this should be changed so that skip_unavailable can only be set for registered clusters with at least one seed
596596
Settings settings = Settings.builder()
597-
.put("search.remote.foo.skip_if_disconnected", true)
598-
.put("search.remote.bar.skip_if_disconnected", false).build();
599-
RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
597+
.put("search.remote.foo.skip_unavailable", true)
598+
.put("search.remote.bar.skip_unavailable", false).build();
599+
RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(settings).forEach(setting -> setting.get(settings));
600600

601601
Settings brokenSettings = Settings.builder()
602-
.put("search.remote.foo.skip_if_disconnected", "broken").build();
602+
.put("search.remote.foo.skip_unavailable", "broken").build();
603603
expectThrows(IllegalArgumentException.class, () ->
604-
RemoteClusterAware.REMOTE_CLUSTER_SKIP_IF_DISCONNECTED.getAllConcreteSettings(brokenSettings)
604+
RemoteClusterAware.REMOTE_CLUSTER_SKIP_UNAVAILABLE.getAllConcreteSettings(brokenSettings)
605605
.forEach(setting -> setting.get(brokenSettings)));
606606
}
607607
}

0 commit comments

Comments
 (0)