Skip to content

Commit 2c6437b

Browse files
committed
Return transport addresses from UnicastHostsProvider (#31426)
With #20695 we removed local transport and there is just TransportAddress now. The UnicastHostsProvider currently returns DiscoveryNode instances, where, during pinging, we're actually only making use of the TransportAddress to establish a first connection to the possible new node. To simplify the interface, we can just return a list of transport addresses instead, which means that it's not necessary anymore to create fake node objects in each plugin just to return the address information.
1 parent 71b3ac2 commit 2c6437b

File tree

12 files changed

+175
-220
lines changed

12 files changed

+175
-220
lines changed

plugins/discovery-azure-classic/src/main/java/org/elasticsearch/discovery/azure/classic/AzureUnicastHostsProvider.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424
import com.microsoft.windowsazure.management.compute.models.HostedServiceGetDetailedResponse;
2525
import com.microsoft.windowsazure.management.compute.models.InstanceEndpoint;
2626
import com.microsoft.windowsazure.management.compute.models.RoleInstance;
27-
import org.elasticsearch.Version;
2827
import org.elasticsearch.cloud.azure.classic.AzureServiceDisableException;
2928
import org.elasticsearch.cloud.azure.classic.AzureServiceRemoteException;
3029
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService;
3130
import org.elasticsearch.cloud.azure.classic.management.AzureComputeService.Discovery;
32-
import org.elasticsearch.cluster.node.DiscoveryNode;
3331
import org.elasticsearch.common.Strings;
3432
import org.elasticsearch.common.component.AbstractComponent;
3533
import org.elasticsearch.common.network.InetAddresses;
@@ -47,9 +45,6 @@
4745
import java.util.ArrayList;
4846
import java.util.List;
4947

50-
import static java.util.Collections.emptyMap;
51-
import static java.util.Collections.emptySet;
52-
5348
public class AzureUnicastHostsProvider extends AbstractComponent implements UnicastHostsProvider {
5449

5550
public enum HostType {
@@ -104,7 +99,7 @@ public static Deployment fromString(String string) {
10499

105100
private final TimeValue refreshInterval;
106101
private long lastRefresh;
107-
private List<DiscoveryNode> cachedDiscoNodes;
102+
private List<TransportAddress> dynamicHosts;
108103
private final HostType hostType;
109104
private final String publicEndpointName;
110105
private final String deploymentName;
@@ -137,30 +132,30 @@ public AzureUnicastHostsProvider(Settings settings, AzureComputeService azureCom
137132
* Setting `cloud.azure.refresh_interval` to `0` will disable caching (default).
138133
*/
139134
@Override
140-
public List<DiscoveryNode> buildDynamicNodes() {
135+
public List<TransportAddress> buildDynamicHosts() {
141136
if (refreshInterval.millis() != 0) {
142-
if (cachedDiscoNodes != null &&
137+
if (dynamicHosts != null &&
143138
(refreshInterval.millis() < 0 || (System.currentTimeMillis() - lastRefresh) < refreshInterval.millis())) {
144139
logger.trace("using cache to retrieve node list");
145-
return cachedDiscoNodes;
140+
return dynamicHosts;
146141
}
147142
lastRefresh = System.currentTimeMillis();
148143
}
149144
logger.debug("start building nodes list using Azure API");
150145

151-
cachedDiscoNodes = new ArrayList<>();
146+
dynamicHosts = new ArrayList<>();
152147

153148
HostedServiceGetDetailedResponse detailed;
154149
try {
155150
detailed = azureComputeService.getServiceDetails();
156151
} catch (AzureServiceDisableException e) {
157152
logger.debug("Azure discovery service has been disabled. Returning empty list of nodes.");
158-
return cachedDiscoNodes;
153+
return dynamicHosts;
159154
} catch (AzureServiceRemoteException e) {
160155
// We got a remote exception
161156
logger.warn("can not get list of azure nodes: [{}]. Returning empty list of nodes.", e.getMessage());
162157
logger.trace("AzureServiceRemoteException caught", e);
163-
return cachedDiscoNodes;
158+
return dynamicHosts;
164159
}
165160

166161
InetAddress ipAddress = null;
@@ -212,18 +207,17 @@ public List<DiscoveryNode> buildDynamicNodes() {
212207
TransportAddress[] addresses = transportService.addressesFromString(networkAddress, 1);
213208
for (TransportAddress address : addresses) {
214209
logger.trace("adding {}, transport_address {}", networkAddress, address);
215-
cachedDiscoNodes.add(new DiscoveryNode("#cloud-" + instance.getInstanceName(), address, emptyMap(),
216-
emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
210+
dynamicHosts.add(address);
217211
}
218212
} catch (Exception e) {
219213
logger.warn("can not convert [{}] to transport address. skipping. [{}]", networkAddress, e.getMessage());
220214
}
221215
}
222216
}
223217

224-
logger.debug("{} node(s) added", cachedDiscoNodes.size());
218+
logger.debug("{} addresses added", dynamicHosts.size());
225219

226-
return cachedDiscoNodes;
220+
return dynamicHosts;
227221
}
228222

229223
protected String resolveInstanceAddress(final HostType hostType, final RoleInstance instance) {

plugins/discovery-ec2/src/main/java/org/elasticsearch/discovery/ec2/AwsEc2UnicastHostsProvider.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import com.amazonaws.services.ec2.model.Tag;
3030
import org.apache.logging.log4j.message.ParameterizedMessage;
3131
import org.apache.logging.log4j.util.Supplier;
32-
import org.elasticsearch.Version;
33-
import org.elasticsearch.cluster.node.DiscoveryNode;
3432
import org.elasticsearch.common.component.AbstractComponent;
3533
import org.elasticsearch.common.settings.Settings;
3634
import org.elasticsearch.common.transport.TransportAddress;
@@ -46,8 +44,6 @@
4644
import java.util.Set;
4745

4846
import static java.util.Collections.disjoint;
49-
import static java.util.Collections.emptyMap;
50-
import static java.util.Collections.emptySet;
5147
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.TAG_PREFIX;
5248
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_DNS;
5349
import static org.elasticsearch.discovery.ec2.AwsEc2Service.HostType.PRIVATE_IP;
@@ -70,15 +66,15 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
7066

7167
private final String hostType;
7268

73-
private final DiscoNodesCache discoNodes;
69+
private final TransportAddressesCache dynamicHosts;
7470

7571
AwsEc2UnicastHostsProvider(Settings settings, TransportService transportService, AwsEc2Service awsEc2Service) {
7672
super(settings);
7773
this.transportService = transportService;
7874
this.awsEc2Service = awsEc2Service;
7975

8076
this.hostType = AwsEc2Service.HOST_TYPE_SETTING.get(settings);
81-
this.discoNodes = new DiscoNodesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
77+
this.dynamicHosts = new TransportAddressesCache(AwsEc2Service.NODE_CACHE_TIME_SETTING.get(settings));
8278

8379
this.bindAnyGroup = AwsEc2Service.ANY_GROUP_SETTING.get(settings);
8480
this.groups = new HashSet<>();
@@ -96,13 +92,13 @@ class AwsEc2UnicastHostsProvider extends AbstractComponent implements UnicastHos
9692
}
9793

9894
@Override
99-
public List<DiscoveryNode> buildDynamicNodes() {
100-
return discoNodes.getOrRefresh();
95+
public List<TransportAddress> buildDynamicHosts() {
96+
return dynamicHosts.getOrRefresh();
10197
}
10298

103-
protected List<DiscoveryNode> fetchDynamicNodes() {
99+
protected List<TransportAddress> fetchDynamicNodes() {
104100

105-
final List<DiscoveryNode> discoNodes = new ArrayList<>();
101+
final List<TransportAddress> dynamicHosts = new ArrayList<>();
106102

107103
final DescribeInstancesResult descInstances;
108104
try (AmazonEc2Reference clientReference = awsEc2Service.client()) {
@@ -115,7 +111,7 @@ protected List<DiscoveryNode> fetchDynamicNodes() {
115111
} catch (final AmazonClientException e) {
116112
logger.info("Exception while retrieving instance list from AWS API: {}", e.getMessage());
117113
logger.debug("Full exception:", e);
118-
return discoNodes;
114+
return dynamicHosts;
119115
}
120116

121117
logger.trace("building dynamic unicast discovery nodes...");
@@ -179,8 +175,7 @@ && disjoint(securityGroupIds, groups)) {
179175
final TransportAddress[] addresses = transportService.addressesFromString(address, 1);
180176
for (int i = 0; i < addresses.length; i++) {
181177
logger.trace("adding {}, address {}, transport_address {}", instance.getInstanceId(), address, addresses[i]);
182-
discoNodes.add(new DiscoveryNode(instance.getInstanceId(), "#cloud-" + instance.getInstanceId() + "-" + i,
183-
addresses[i], emptyMap(), emptySet(), Version.CURRENT.minimumCompatibilityVersion()));
178+
dynamicHosts.add(addresses[i]);
184179
}
185180
} catch (final Exception e) {
186181
final String finalAddress = address;
@@ -194,9 +189,9 @@ && disjoint(securityGroupIds, groups)) {
194189
}
195190
}
196191

197-
logger.debug("using dynamic discovery nodes {}", discoNodes);
192+
logger.debug("using dynamic transport addresses {}", dynamicHosts);
198193

199-
return discoNodes;
194+
return dynamicHosts;
200195
}
201196

202197
private DescribeInstancesRequest buildDescribeInstancesRequest() {
@@ -222,11 +217,11 @@ private DescribeInstancesRequest buildDescribeInstancesRequest() {
222217
return describeInstancesRequest;
223218
}
224219

225-
private final class DiscoNodesCache extends SingleObjectCache<List<DiscoveryNode>> {
220+
private final class TransportAddressesCache extends SingleObjectCache<List<TransportAddress>> {
226221

227222
private boolean empty = true;
228223

229-
protected DiscoNodesCache(TimeValue refreshInterval) {
224+
protected TransportAddressesCache(TimeValue refreshInterval) {
230225
super(refreshInterval, new ArrayList<>());
231226
}
232227

@@ -236,8 +231,8 @@ protected boolean needsRefresh() {
236231
}
237232

238233
@Override
239-
protected List<DiscoveryNode> refresh() {
240-
final List<DiscoveryNode> nodes = fetchDynamicNodes();
234+
protected List<TransportAddress> refresh() {
235+
final List<TransportAddress> nodes = fetchDynamicNodes();
241236
empty = nodes.isEmpty();
242237
return nodes;
243238
}

plugins/discovery-ec2/src/test/java/org/elasticsearch/discovery/ec2/Ec2DiscoveryTests.java

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.amazonaws.services.ec2.model.Tag;
2323
import org.elasticsearch.Version;
24-
import org.elasticsearch.cluster.node.DiscoveryNode;
2524
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2625
import org.elasticsearch.common.network.NetworkService;
2726
import org.elasticsearch.common.settings.Settings;
@@ -87,16 +86,16 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
8786
null);
8887
}
8988

90-
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes) {
91-
return buildDynamicNodes(nodeSettings, nodes, null);
89+
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes) {
90+
return buildDynamicHosts(nodeSettings, nodes, null);
9291
}
9392

94-
protected List<DiscoveryNode> buildDynamicNodes(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
93+
protected List<TransportAddress> buildDynamicHosts(Settings nodeSettings, int nodes, List<List<Tag>> tagsList) {
9594
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY, nodes, tagsList)) {
9695
AwsEc2UnicastHostsProvider provider = new AwsEc2UnicastHostsProvider(nodeSettings, transportService, plugin.ec2Service);
97-
List<DiscoveryNode> discoveryNodes = provider.buildDynamicNodes();
98-
logger.debug("--> nodes found: {}", discoveryNodes);
99-
return discoveryNodes;
96+
List<TransportAddress> dynamicHosts = provider.buildDynamicHosts();
97+
logger.debug("--> addresses found: {}", dynamicHosts);
98+
return dynamicHosts;
10099
} catch (IOException e) {
101100
fail("Unexpected IOException");
102101
return null;
@@ -107,7 +106,7 @@ public void testDefaultSettings() throws InterruptedException {
107106
int nodes = randomInt(10);
108107
Settings nodeSettings = Settings.builder()
109108
.build();
110-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
109+
List<TransportAddress> discoveryNodes = buildDynamicHosts(nodeSettings, nodes);
111110
assertThat(discoveryNodes, hasSize(nodes));
112111
}
113112

@@ -119,12 +118,11 @@ public void testPrivateIp() throws InterruptedException {
119118
Settings nodeSettings = Settings.builder()
120119
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_ip")
121120
.build();
122-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
123-
assertThat(discoveryNodes, hasSize(nodes));
121+
List<TransportAddress> transportAddresses = buildDynamicHosts(nodeSettings, nodes);
122+
assertThat(transportAddresses, hasSize(nodes));
124123
// We check that we are using here expected address
125124
int node = 1;
126-
for (DiscoveryNode discoveryNode : discoveryNodes) {
127-
TransportAddress address = discoveryNode.getAddress();
125+
for (TransportAddress address : transportAddresses) {
128126
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PRIVATE_IP + node++);
129127
assertEquals(address, expected);
130128
}
@@ -138,12 +136,11 @@ public void testPublicIp() throws InterruptedException {
138136
Settings nodeSettings = Settings.builder()
139137
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_ip")
140138
.build();
141-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
142-
assertThat(discoveryNodes, hasSize(nodes));
139+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
140+
assertThat(dynamicHosts, hasSize(nodes));
143141
// We check that we are using here expected address
144142
int node = 1;
145-
for (DiscoveryNode discoveryNode : discoveryNodes) {
146-
TransportAddress address = discoveryNode.getAddress();
143+
for (TransportAddress address : dynamicHosts) {
147144
TransportAddress expected = poorMansDNS.get(AmazonEC2Mock.PREFIX_PUBLIC_IP + node++);
148145
assertEquals(address, expected);
149146
}
@@ -159,13 +156,12 @@ public void testPrivateDns() throws InterruptedException {
159156
Settings nodeSettings = Settings.builder()
160157
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "private_dns")
161158
.build();
162-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
163-
assertThat(discoveryNodes, hasSize(nodes));
159+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
160+
assertThat(dynamicHosts, hasSize(nodes));
164161
// We check that we are using here expected address
165162
int node = 1;
166-
for (DiscoveryNode discoveryNode : discoveryNodes) {
163+
for (TransportAddress address : dynamicHosts) {
167164
String instanceId = "node" + node++;
168-
TransportAddress address = discoveryNode.getAddress();
169165
TransportAddress expected = poorMansDNS.get(
170166
AmazonEC2Mock.PREFIX_PRIVATE_DNS + instanceId + AmazonEC2Mock.SUFFIX_PRIVATE_DNS);
171167
assertEquals(address, expected);
@@ -182,13 +178,12 @@ public void testPublicDns() throws InterruptedException {
182178
Settings nodeSettings = Settings.builder()
183179
.put(AwsEc2Service.HOST_TYPE_SETTING.getKey(), "public_dns")
184180
.build();
185-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes);
186-
assertThat(discoveryNodes, hasSize(nodes));
181+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes);
182+
assertThat(dynamicHosts, hasSize(nodes));
187183
// We check that we are using here expected address
188184
int node = 1;
189-
for (DiscoveryNode discoveryNode : discoveryNodes) {
185+
for (TransportAddress address : dynamicHosts) {
190186
String instanceId = "node" + node++;
191-
TransportAddress address = discoveryNode.getAddress();
192187
TransportAddress expected = poorMansDNS.get(
193188
AmazonEC2Mock.PREFIX_PUBLIC_DNS + instanceId + AmazonEC2Mock.SUFFIX_PUBLIC_DNS);
194189
assertEquals(address, expected);
@@ -201,7 +196,7 @@ public void testInvalidHostType() throws InterruptedException {
201196
.build();
202197

203198
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
204-
buildDynamicNodes(nodeSettings, 1);
199+
buildDynamicHosts(nodeSettings, 1);
205200
});
206201
assertThat(exception.getMessage(), containsString("does_not_exist is unknown for discovery.ec2.host_type"));
207202
}
@@ -227,8 +222,8 @@ public void testFilterByTags() throws InterruptedException {
227222
}
228223

229224
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
230-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
231-
assertThat(discoveryNodes, hasSize(prodInstances));
225+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
226+
assertThat(dynamicHosts, hasSize(prodInstances));
232227
}
233228

234229
public void testFilterByMultipleTags() throws InterruptedException {
@@ -258,8 +253,8 @@ public void testFilterByMultipleTags() throws InterruptedException {
258253
}
259254

260255
logger.info("started [{}] instances with [{}] stage=prod tag", nodes, prodInstances);
261-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
262-
assertThat(discoveryNodes, hasSize(prodInstances));
256+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
257+
assertThat(dynamicHosts, hasSize(prodInstances));
263258
}
264259

265260
public void testReadHostFromTag() throws InterruptedException, UnknownHostException {
@@ -285,11 +280,11 @@ public void testReadHostFromTag() throws InterruptedException, UnknownHostExcept
285280
}
286281

287282
logger.info("started [{}] instances", nodes);
288-
List<DiscoveryNode> discoveryNodes = buildDynamicNodes(nodeSettings, nodes, tagsList);
289-
assertThat(discoveryNodes, hasSize(nodes));
290-
for (DiscoveryNode discoveryNode : discoveryNodes) {
291-
TransportAddress address = discoveryNode.getAddress();
292-
TransportAddress expected = poorMansDNS.get(discoveryNode.getName());
283+
List<TransportAddress> dynamicHosts = buildDynamicHosts(nodeSettings, nodes, tagsList);
284+
assertThat(dynamicHosts, hasSize(nodes));
285+
int node = 1;
286+
for (TransportAddress address : dynamicHosts) {
287+
TransportAddress expected = poorMansDNS.get("node" + node++);
293288
assertEquals(address, expected);
294289
}
295290
}
@@ -306,13 +301,13 @@ public void testGetNodeListEmptyCache() throws Exception {
306301
AwsEc2Service awsEc2Service = new AwsEc2ServiceMock(Settings.EMPTY, 1, null);
307302
DummyEc2HostProvider provider = new DummyEc2HostProvider(Settings.EMPTY, transportService, awsEc2Service) {
308303
@Override
309-
protected List<DiscoveryNode> fetchDynamicNodes() {
304+
protected List<TransportAddress> fetchDynamicNodes() {
310305
fetchCount++;
311306
return new ArrayList<>();
312307
}
313308
};
314309
for (int i=0; i<3; i++) {
315-
provider.buildDynamicNodes();
310+
provider.buildDynamicHosts();
316311
}
317312
assertThat(provider.fetchCount, is(3));
318313
}
@@ -323,18 +318,18 @@ public void testGetNodeListCached() throws Exception {
323318
try (Ec2DiscoveryPluginMock plugin = new Ec2DiscoveryPluginMock(Settings.EMPTY)) {
324319
DummyEc2HostProvider provider = new DummyEc2HostProvider(builder.build(), transportService, plugin.ec2Service) {
325320
@Override
326-
protected List<DiscoveryNode> fetchDynamicNodes() {
321+
protected List<TransportAddress> fetchDynamicNodes() {
327322
fetchCount++;
328-
return Ec2DiscoveryTests.this.buildDynamicNodes(Settings.EMPTY, 1);
323+
return Ec2DiscoveryTests.this.buildDynamicHosts(Settings.EMPTY, 1);
329324
}
330325
};
331326
for (int i=0; i<3; i++) {
332-
provider.buildDynamicNodes();
327+
provider.buildDynamicHosts();
333328
}
334329
assertThat(provider.fetchCount, is(1));
335330
Thread.sleep(1_000L); // wait for cache to expire
336331
for (int i=0; i<3; i++) {
337-
provider.buildDynamicNodes();
332+
provider.buildDynamicHosts();
338333
}
339334
assertThat(provider.fetchCount, is(2));
340335
}

0 commit comments

Comments
 (0)