Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCS: don't proxy requests for already connected node #31273

Merged
merged 4 commits into from
Jun 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -351,8 +350,7 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
OriginalIndices localIndices,
List<SearchShardIterator> remoteShardIterators) {
List<SearchShardIterator> shards = new ArrayList<>();
shards.addAll(remoteShardIterators);
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
for (ShardIterator shardIterator : localShardsIterator) {
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
}
Expand Down Expand Up @@ -384,7 +382,7 @@ private AbstractSearchAsyncAction searchAsyncAction(SearchTask task, SearchReque
clusterStateVersion, aliasFilter, concreteIndexBoosts, indexRoutings, listener, false, clusters);
return new SearchPhase(action.getName()) {
@Override
public void run() throws IOException {
public void run() {
action.start();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand All @@ -40,6 +39,7 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
Expand All @@ -50,7 +50,6 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -61,7 +60,6 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -181,7 +179,7 @@ public void ensureConnected(ActionListener<Void> voidActionListener) {

private void fetchShardsInternal(ClusterSearchShardsRequest searchShardsRequest,
final ActionListener<ClusterSearchShardsResponse> listener) {
final DiscoveryNode node = connectedNodes.get();
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterSearchShardsAction.NAME, searchShardsRequest,
new TransportResponseHandler<ClusterSearchShardsResponse>() {

Expand Down Expand Up @@ -217,7 +215,7 @@ void collectNodes(ActionListener<Function<String, DiscoveryNode>> listener) {
request.clear();
request.nodes(true);
request.local(true); // run this on the node that gets the request it's as good as any other
final DiscoveryNode node = connectedNodes.get();
final DiscoveryNode node = connectedNodes.getAny();
transportService.sendRequest(node, ClusterStateAction.NAME, request, TransportRequestOptions.EMPTY,
new TransportResponseHandler<ClusterStateResponse>() {
@Override
Expand Down Expand Up @@ -255,40 +253,52 @@ public String executor() {
}

/**
* Returns a connection to the remote cluster. This connection might be a proxy connection that redirects internally to the
* given node.
* Returns a connection to the remote cluster, preferably a direct connection to the provided {@link DiscoveryNode}.
* If such node is not connected, the returned connection will be a proxy connection that redirects to it.
*/
Transport.Connection getConnection(DiscoveryNode remoteClusterNode) {
DiscoveryNode discoveryNode = connectedNodes.get();
if (transportService.nodeConnected(remoteClusterNode)) {
return transportService.getConnection(remoteClusterNode);
}
DiscoveryNode discoveryNode = connectedNodes.getAny();
Transport.Connection connection = transportService.getConnection(discoveryNode);
return new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return remoteClusterNode;
}
return new ProxyConnection(connection, remoteClusterNode);
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
static final class ProxyConnection implements Transport.Connection {
private final Transport.Connection proxyConnection;
private final DiscoveryNode targetNode;

private ProxyConnection(Transport.Connection proxyConnection, DiscoveryNode targetNode) {
this.proxyConnection = proxyConnection;
this.targetNode = targetNode;
}

@Override
public DiscoveryNode getNode() {
return targetNode;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(remoteClusterNode, request), options);
}
proxyConnection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
TransportActionProxy.wrapRequest(targetNode, request), options);
}

@Override
public void close() throws IOException {
assert false: "proxy connections must not be closed";
}
@Override
public void close() {
assert false: "proxy connections must not be closed";
}

@Override
public Version getVersion() {
return connection.getVersion();
}
};
@Override
public Version getVersion() {
return proxyConnection.getVersion();
}
}

Transport.Connection getConnection() {
DiscoveryNode discoveryNode = connectedNodes.get();
return transportService.getConnection(discoveryNode);
return transportService.getConnection(getAnyConnectedNode());
}

@Override
Expand Down Expand Up @@ -385,7 +395,7 @@ public void onFailure(Exception e) {
}

@Override
protected void doRun() throws Exception {
protected void doRun() {
ActionListener<Void> listener = ActionListener.wrap((x) -> {
synchronized (queue) {
running.release();
Expand Down Expand Up @@ -590,8 +600,8 @@ boolean isNodeConnected(final DiscoveryNode node) {
return connectedNodes.contains(node);
}

DiscoveryNode getConnectedNode() {
return connectedNodes.get();
DiscoveryNode getAnyConnectedNode() {
return connectedNodes.getAny();
}

void addConnectedNode(DiscoveryNode node) {
Expand All @@ -612,7 +622,7 @@ int getNumNodesConnected() {
return connectedNodes.size();
}

private static class ConnectedNodes implements Supplier<DiscoveryNode> {
private static final class ConnectedNodes {

private final Set<DiscoveryNode> nodeSet = new HashSet<>();
private final String clusterAlias;
Expand All @@ -623,8 +633,7 @@ private ConnectedNodes(String clusterAlias) {
this.clusterAlias = clusterAlias;
}

@Override
public synchronized DiscoveryNode get() {
public synchronized DiscoveryNode getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return currentIterator.next();
Expand Down Expand Up @@ -657,15 +666,6 @@ synchronized boolean contains(DiscoveryNode node) {
return nodeSet.contains(node);
}

synchronized Optional<DiscoveryNode> getAny() {
ensureIteratorAvailable();
if (currentIterator.hasNext()) {
return Optional.of(currentIterator.next());
} else {
return Optional.empty();
}
}

private synchronized void ensureIteratorAvailable() {
if (currentIterator == null) {
currentIterator = nodeSet.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.Matchers.startsWith;

public class RemoteClusterConnectionTests extends ESTestCase {
Expand Down Expand Up @@ -992,7 +993,7 @@ public void testConnectedNodesConcurrentAccess() throws IOException, Interrupted
barrier.await();
for (int j = 0; j < numGetCalls; j++) {
try {
DiscoveryNode node = connection.getConnectedNode();
DiscoveryNode node = connection.getAnyConnectedNode();
assertNotNull(node);
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("No node available for cluster:") == false) {
Expand Down Expand Up @@ -1053,10 +1054,10 @@ public void testClusterNameIsChecked() throws Exception {
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool, settings);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT, threadPool,
settings);
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
MockTransportService otherClusterTransport = startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build());
MockTransportService otherClusterDiscoverable= startTransport("other_cluster_discoverable_node", otherClusterKnownNodes,
Version.CURRENT, threadPool, Settings.builder().put("cluster.name", "otherCluster").build())) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
Expand Down Expand Up @@ -1093,4 +1094,76 @@ public void testClusterNameIsChecked() throws Exception {
}
}
}

public void testGetConnection() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {

DiscoveryNode connectedNode = seedTransport.getLocalDiscoNode();
assertThat(connectedNode, notNullValue());
knownNodes.add(connectedNode);

DiscoveryNode disconnectedNode = discoverableTransport.getLocalDiscoNode();
assertThat(disconnectedNode, notNullValue());
knownNodes.add(disconnectedNode);

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
Transport.Connection seedConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return connectedNode;
}

@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws TransportException {
// no-op
}

@Override
public void close() {
// no-op
}
};
service.addDelegate(connectedNode.getAddress(), new MockTransportService.DelegateTransport(service.getOriginalTransport()) {
@Override
public Connection getConnection(DiscoveryNode node) {
if (node == connectedNode) {
return seedConnection;
}
return super.getConnection(node);
}

@Override
public boolean nodeConnected(DiscoveryNode node) {
return node.equals(connectedNode);
}
});
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Collections.singletonList(connectedNode), service, Integer.MAX_VALUE, n -> true)) {
connection.addConnectedNode(connectedNode);
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(connectedNode);
assertSame(seedConnection, remoteConnection);
}
for (int i = 0; i < 10; i++) {
//always a direct connection as the remote node is already connected
Transport.Connection remoteConnection = connection.getConnection(service.getLocalNode());
assertThat(remoteConnection, not(instanceOf(RemoteClusterConnection.ProxyConnection.class)));
assertThat(remoteConnection.getNode(), equalTo(service.getLocalNode()));
}
for (int i = 0; i < 10; i++) {
//always a proxy connection as the target node is not connected
Transport.Connection remoteConnection = connection.getConnection(disconnectedNode);
assertThat(remoteConnection, instanceOf(RemoteClusterConnection.ProxyConnection.class));
assertThat(remoteConnection.getNode(), sameInstance(disconnectedNode));
}
}
}
}
}
}