Skip to content

Commit 06efab0

Browse files
committed
Add support for local cluster alias to SearchRequest (#36997)
With the upcoming cross-cluster search alternate execution mode, the CCS node will be able to split a CCS request into multiple search requests, one per remote cluster involved. In order to do that, the CCS node has to be able to signal to each remote cluster that such sub-requests are part of a CCS request. Each cluster does not know about the other clusters involved, and does not know either what alias it is given in the CCS node, hence the CCS coordinating node needs to be able to provide the alias as part of the search request so that it is used as index prefix in the returned search hits. The cluster alias is a notion that's already supported in the search shards iterator and search shard target, but it is currently used in CCS as both index prefix and connection lookup key when fanning out to all the shards. With CCS alternate execution mode the provided cluster alias needs to be used only as index prefix, as shards are local to each cluster hence no cluster alias should be used for connection lookups. The local cluster alias can be set to the SearchRequest at the transport layer only, and its constructor/getter methods are package private. Relates to #32125
1 parent c5ce3ec commit 06efab0

26 files changed

+582
-131
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,16 +318,16 @@ public final void onFailure(Exception e) {
318318
listener.onFailure(e);
319319
}
320320

321+
@Override
321322
public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIterator shardIt) {
322-
String clusterAlias = shardIt.getClusterAlias();
323323
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
324324
assert filter != null;
325325
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
326326
String indexName = shardIt.shardId().getIndex().getName();
327327
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
328328
.toArray(new String[0]);
329329
return new ShardSearchTransportRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
330-
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), clusterAlias, routings);
330+
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
331331
}
332332

333333
/**

server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard,
8585
final SearchShardIterator shardIt, Exception e) {
8686
// we always add the shard failure for a specific shard instance
8787
// we do make sure to clean it on a successful response from a shard
88-
SearchShardTarget shardTarget = new SearchShardTarget(nodeId, shardIt.shardId(), shardIt.getClusterAlias(),
89-
shardIt.getOriginalIndices());
88+
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
9089
onShardFailure(shardIndex, shardTarget, e);
9190

9291
if (totalOps.incrementAndGet() == expectedTotalOps) {
@@ -209,8 +208,8 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
209208
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
210209
} else {
211210
try {
212-
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(new SearchShardTarget(shard.currentNodeId(),
213-
shardIt.shardId(), shardIt.getClusterAlias(), shardIt.getOriginalIndices()), shardIndex) {
211+
executePhaseOnShard(shardIt, shard, new SearchActionListener<FirstResult>(
212+
shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
214213
@Override
215214
public void innerOnResponse(FirstResult result) {
216215
maybeFork(thread, () -> onShardResult(result, shardIt));

server/src/main/java/org/elasticsearch/action/search/ScrollIdForNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public String getNode() {
3636
return node;
3737
}
3838

39+
@Nullable
3940
public String getClusterAlias() {
4041
return clusterAlias;
4142
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6565
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
6666
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;
6767

68+
private String localClusterAlias;
69+
6870
private SearchType searchType = SearchType.DEFAULT;
6971

7072
private String[] indices = Strings.EMPTY_ARRAY;
@@ -95,6 +97,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
9597
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;
9698

9799
public SearchRequest() {
100+
this.localClusterAlias = null;
98101
}
99102

100103
/**
@@ -114,6 +117,7 @@ public SearchRequest(SearchRequest searchRequest) {
114117
this.searchType = searchRequest.searchType;
115118
this.source = searchRequest.source;
116119
this.types = searchRequest.types;
120+
this.localClusterAlias = searchRequest.localClusterAlias;
117121
}
118122

119123
/**
@@ -128,13 +132,22 @@ public SearchRequest(String... indices) {
128132
* Constructs a new search request against the provided indices with the given search source.
129133
*/
130134
public SearchRequest(String[] indices, SearchSourceBuilder source) {
135+
this();
131136
if (source == null) {
132137
throw new IllegalArgumentException("source must not be null");
133138
}
134139
indices(indices);
135140
this.source = source;
136141
}
137142

143+
/**
144+
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
145+
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
146+
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
147+
*/
148+
SearchRequest(String localClusterAlias) {
149+
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
150+
}
138151

139152
@Override
140153
public ActionRequestValidationException validate() {
@@ -162,6 +175,16 @@ public ActionRequestValidationException validate() {
162175
return validationException;
163176
}
164177

178+
/**
179+
* Returns the alias of the cluster that this search request is being executed on. A non-null value indicates that this search request
180+
* is being executed as part of a locally reduced cross-cluster search request. The cluster alias is used to prefix index names
181+
* returned as part of search hits with the alias of the cluster they came from.
182+
*/
183+
@Nullable
184+
String getLocalClusterAlias() {
185+
return localClusterAlias;
186+
}
187+
165188
/**
166189
* Sets the indices the search will be executed on.
167190
*/
@@ -470,7 +493,12 @@ public void readFrom(StreamInput in) throws IOException {
470493
}
471494
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
472495
allowPartialSearchResults = in.readOptionalBoolean();
473-
}
496+
}
497+
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
498+
localClusterAlias = in.readOptionalString();
499+
} else {
500+
localClusterAlias = null;
501+
}
474502
}
475503

476504
@Override
@@ -495,7 +523,10 @@ public void writeTo(StreamOutput out) throws IOException {
495523
}
496524
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
497525
out.writeOptionalBoolean(allowPartialSearchResults);
498-
}
526+
}
527+
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
528+
out.writeOptionalString(localClusterAlias);
529+
}
499530
}
500531

501532
@Override
@@ -519,14 +550,15 @@ public boolean equals(Object o) {
519550
Objects.equals(maxConcurrentShardRequests, that.maxConcurrentShardRequests) &&
520551
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
521552
Objects.equals(indicesOptions, that.indicesOptions) &&
522-
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
553+
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
554+
Objects.equals(localClusterAlias, that.localClusterAlias);
523555
}
524556

525557
@Override
526558
public int hashCode() {
527559
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
528-
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
529-
allowPartialSearchResults);
560+
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
561+
allowPartialSearchResults, localClusterAlias);
530562
}
531563

532564
@Override
@@ -544,6 +576,7 @@ public String toString() {
544576
", batchedReduceSize=" + batchedReduceSize +
545577
", preFilterShardSize=" + preFilterShardSize +
546578
", allowPartialSearchResults=" + allowPartialSearchResults +
579+
", localClusterAlias=" + localClusterAlias +
547580
", source=" + source + '}';
548581
}
549582
}

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,34 @@
2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.cluster.routing.PlainShardIterator;
2424
import org.elasticsearch.cluster.routing.ShardRouting;
25+
import org.elasticsearch.common.Nullable;
2526
import org.elasticsearch.index.shard.ShardId;
27+
import org.elasticsearch.search.SearchShardTarget;
2628

2729
import java.util.List;
2830

2931
/**
3032
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
31-
* of the search request. Useful especially with cross cluster search, as each cluster has its own set of original indices.
33+
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
34+
* the cluster alias.
35+
* @see OriginalIndices
3236
*/
3337
public final class SearchShardIterator extends PlainShardIterator {
3438

3539
private final OriginalIndices originalIndices;
36-
private String clusterAlias;
40+
private final String clusterAlias;
3741
private boolean skip = false;
3842

3943
/**
4044
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
4145
* this the a given <code>shardId</code>.
4246
*
47+
* @param clusterAlias the alias of the cluster where the shard is located
4348
* @param shardId shard id of the group
4449
* @param shards shards to iterate
50+
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
4551
*/
46-
public SearchShardIterator(String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
52+
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
4753
super(shardId, shards);
4854
this.originalIndices = originalIndices;
4955
this.clusterAlias = clusterAlias;
@@ -56,10 +62,22 @@ public OriginalIndices getOriginalIndices() {
5662
return originalIndices;
5763
}
5864

65+
/**
66+
* Returns the alias of the cluster where the shard is located.
67+
*/
68+
@Nullable
5969
public String getClusterAlias() {
6070
return clusterAlias;
6171
}
6272

73+
/**
74+
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
75+
* @see SearchShardTarget
76+
*/
77+
SearchShardTarget newSearchShardTarget(String nodeId) {
78+
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
79+
}
80+
6381
/**
6482
* Reset the iterator and mark it as skippable
6583
* @see #skip()

server/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public static ShardSearchFailure readShardSearchFailure(StreamInput in) throws I
9898

9999
@Override
100100
public void readFrom(StreamInput in) throws IOException {
101-
if (in.readBoolean()) {
102-
shardTarget = new SearchShardTarget(in);
101+
shardTarget = in.readOptionalWriteable(SearchShardTarget::new);
102+
if (shardTarget != null) {
103103
index = shardTarget.getFullyQualifiedIndexName();
104104
shardId = shardTarget.getShardId().getId();
105105
}
@@ -110,12 +110,7 @@ public void readFrom(StreamInput in) throws IOException {
110110

111111
@Override
112112
public void writeTo(StreamOutput out) throws IOException {
113-
if (shardTarget == null) {
114-
out.writeBoolean(false);
115-
} else {
116-
out.writeBoolean(true);
117-
shardTarget.writeTo(out);
118-
}
113+
out.writeOptionalWriteable(shardTarget);
119114
out.writeString(reason);
120115
RestStatus.writeTo(out, status);
121116
out.writeException(cause);
@@ -175,7 +170,7 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
175170
SearchShardTarget searchShardTarget = null;
176171
if (nodeId != null) {
177172
searchShardTarget = new SearchShardTarget(nodeId,
178-
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
173+
new ShardId(new Index(indexName, IndexMetaData.INDEX_UUID_NA_VALUE), shardId), clusterAlias, OriginalIndices.NONE);
179174
}
180175
return new ShardSearchFailure(exception, searchShardTarget);
181176
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3636
import org.elasticsearch.cluster.routing.ShardIterator;
3737
import org.elasticsearch.cluster.service.ClusterService;
38+
import org.elasticsearch.common.Nullable;
3839
import org.elasticsearch.common.inject.Inject;
3940
import org.elasticsearch.common.settings.Setting;
4041
import org.elasticsearch.common.settings.Setting.Property;
@@ -61,6 +62,7 @@
6162
import java.util.Set;
6263
import java.util.concurrent.Executor;
6364
import java.util.function.BiFunction;
65+
import java.util.function.Function;
6466
import java.util.function.LongSupplier;
6567

6668
import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
@@ -311,7 +313,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
311313
GroupShardsIterator<ShardIterator> localShardsIterator = clusterService.operationRouting().searchShards(clusterState,
312314
concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts);
313315
GroupShardsIterator<SearchShardIterator> shardIterators = mergeShardsIterators(localShardsIterator, localIndices,
314-
remoteShardIterators);
316+
searchRequest.getLocalClusterAlias(), remoteShardIterators);
315317

316318
failIfOverShardCountLimit(clusterService, shardIterators.size());
317319

@@ -338,13 +340,8 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
338340
}
339341

340342
final DiscoveryNodes nodes = clusterState.nodes();
341-
BiFunction<String, String, Transport.Connection> connectionLookup = (clusterName, nodeId) -> {
342-
final DiscoveryNode discoveryNode = clusterName == null ? nodes.get(nodeId) : remoteConnections.apply(clusterName, nodeId);
343-
if (discoveryNode == null) {
344-
throw new IllegalStateException("no node found for id: " + nodeId);
345-
}
346-
return searchTransportService.getConnection(clusterName, discoveryNode);
347-
};
343+
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(),
344+
nodes::get, remoteConnections, searchTransportService::getConnection);
348345
if (searchRequest.isMaxConcurrentShardRequestsSet() == false) {
349346
// we try to set a default of max concurrent shard requests based on
350347
// the node count but upper-bound it by 256 by default to keep it sane. A single
@@ -359,7 +356,27 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
359356
Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start();
360357
}
361358

362-
private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupShardsIterator<SearchShardIterator> shardIterators) {
359+
static BiFunction<String, String, Transport.Connection> buildConnectionLookup(String requestClusterAlias,
360+
Function<String, DiscoveryNode> localNodes,
361+
BiFunction<String, String, DiscoveryNode> remoteNodes,
362+
BiFunction<String, DiscoveryNode, Transport.Connection> nodeToConnection) {
363+
return (clusterAlias, nodeId) -> {
364+
final DiscoveryNode discoveryNode;
365+
if (clusterAlias == null || requestClusterAlias != null) {
366+
assert requestClusterAlias == null || requestClusterAlias.equals(clusterAlias);
367+
discoveryNode = localNodes.apply(nodeId);
368+
} else {
369+
discoveryNode = remoteNodes.apply(clusterAlias, nodeId);
370+
}
371+
if (discoveryNode == null) {
372+
throw new IllegalStateException("no node found for id: " + nodeId);
373+
}
374+
return nodeToConnection.apply(clusterAlias, discoveryNode);
375+
};
376+
}
377+
378+
private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
379+
GroupShardsIterator<SearchShardIterator> shardIterators) {
363380
SearchSourceBuilder source = searchRequest.source();
364381
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
365382
SearchService.canRewriteToMatchNone(source) &&
@@ -368,10 +385,11 @@ private boolean shouldPreFilterSearchShards(SearchRequest searchRequest, GroupSh
368385

369386
static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
370387
OriginalIndices localIndices,
388+
@Nullable String localClusterAlias,
371389
List<SearchShardIterator> remoteShardIterators) {
372390
List<SearchShardIterator> shards = new ArrayList<>(remoteShardIterators);
373391
for (ShardIterator shardIterator : localShardsIterator) {
374-
shards.add(new SearchShardIterator(null, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
392+
shards.add(new SearchShardIterator(localClusterAlias, shardIterator.shardId(), shardIterator.getShardRoutings(), localIndices));
375393
}
376394
return new GroupShardsIterator<>(shards);
377395
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ static String buildScrollId(AtomicArray<? extends SearchPhaseResult> searchPhase
4444
out.writeLong(searchPhaseResult.getRequestId());
4545
SearchShardTarget searchShardTarget = searchPhaseResult.getSearchShardTarget();
4646
if (searchShardTarget.getClusterAlias() != null) {
47-
out.writeString(RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(),
48-
searchShardTarget.getNodeId()));
47+
out.writeString(
48+
RemoteClusterAware.buildRemoteIndexName(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()));
4949
} else {
5050
out.writeString(searchShardTarget.getNodeId());
5151
}

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ final class DefaultSearchContext extends SearchContext {
159159
DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
160160
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
161161
IndexShard indexShard, BigArrays bigArrays, Counter timeEstimateCounter, TimeValue timeout,
162-
FetchPhase fetchPhase, String clusterAlias, Version minNodeVersion) {
162+
FetchPhase fetchPhase, Version minNodeVersion) {
163163
this.id = id;
164164
this.request = request;
165165
this.fetchPhase = fetchPhase;
@@ -179,7 +179,7 @@ final class DefaultSearchContext extends SearchContext {
179179
this.timeout = timeout;
180180
this.minNodeVersion = minNodeVersion;
181181
queryShardContext = indexService.newQueryShardContext(request.shardId().id(), searcher.getIndexReader(), request::nowInMillis,
182-
clusterAlias);
182+
shardTarget.getClusterAlias());
183183
queryShardContext.setTypes(request.types());
184184
queryBoost = request.indexBoost();
185185
}

0 commit comments

Comments
 (0)