Skip to content

Shortcut query phase using the results of other shards (#51852) #53659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ static SearchRequest[] sliceIntoSubRequests(SearchRequest request, String field,
if (request.source().slice() != null) {
throw new IllegalStateException("Can't slice a request that already has a slice configuration");
}
slicedSource = request.source().copyWithNewSlice(sliceBuilder);
slicedSource = request.source().shallowCopy().slice(sliceBuilder);
}
SearchRequest searchRequest = new SearchRequest(request);
searchRequest.source(slicedSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ setup:
created_at:
type: date
format: "strict_date"

- do:
indices.create:
index: index_3
Expand Down Expand Up @@ -154,31 +155,3 @@ setup:
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
rest_total_hits_as_int: true
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 0 }
- length: { aggregations.idx_terms.buckets: 0 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
setup:
- do:
indices.create:
index: index_1
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"
- do:
indices.create:
index: index_2
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"

- do:
indices.create:
index: index_3
body:
settings:
number_of_shards: 1
mappings:
properties:
created_at:
type: date
format: "strict_date"


---
"test distributed sort can rewrite query to match no docs":

- skip:
version: " - 7.6.99"
reason: "distributed sort optimization was added in 7.7.0"
- do:
index:
index: index_1
id: 1
body: { "created_at": "2016-01-01"}
- do:
index:
index: index_2
id: 2
body: { "created_at": "2017-01-01" }

- do:
index:
index: index_3
id: 3
body: { "created_at": "2018-01-01" }
- do:
indices.refresh: {}

# check that empty responses are correctly handled when rewriting to match_no_docs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2016-02-01", "lt": "2018-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 2 }
- length: { aggregations.idx_terms.buckets: 2 }

- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 2
body: { "size" : 0, "query" : { "range" : { "created_at" : { "gte" : "2019-02-01"}}}, "aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped : 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 0 }
- length: { aggregations.idx_terms.buckets: 0 }

# check field sort is correct when skipping query phase
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc" } }]

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: { hits.hits.0._id: "3" }

# same with aggs
- do:
search:
# ensure that one shard can return empty response
max_concurrent_shard_requests: 1
pre_filter_shard_size: 1
body:
"size": 1
"track_total_hits": 1
"sort": [{ "created_at": { "order": "desc" } }]
"aggs" : { "idx_terms" : { "terms" : { "field" : "_index" } } }

- match: { _shards.total: 3 }
- match: { _shards.successful: 3 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total.value: 1 }
- match: { hits.total.relation: "gte" }
- length: { hits.hits: 1 }
- match: {hits.hits.0._id: "3" }
- length: { aggregations.idx_terms.buckets: 3 }
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
private final SearchPhaseResults<Result> results;
final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand Down Expand Up @@ -467,7 +467,7 @@ public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarg
* @param result the result returned form the shard
* @param shardIt the shard iterator
*/
private void onShardResult(Result result, SearchShardIterator shardIt) {
protected void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
successfulOps.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopFieldDocs;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchSortValuesAndFormats;

/**
* Utility class to keep track of the bottom doc's sort values in a distributed search.
*/
class BottomSortValuesCollector {
private final int topNSize;
private final SortField[] sortFields;
private final FieldComparator[] comparators;
private final int[] reverseMuls;

private volatile long totalHits;
private volatile SearchSortValuesAndFormats bottomSortValues;

BottomSortValuesCollector(int topNSize, SortField[] sortFields) {
this.topNSize = topNSize;
this.comparators = new FieldComparator[sortFields.length];
this.reverseMuls = new int[sortFields.length];
this.sortFields = sortFields;
for (int i = 0; i < sortFields.length; i++) {
comparators[i] = sortFields[i].getComparator(1, i);
reverseMuls[i] = sortFields[i].getReverse() ? -1 : 1;
}
}

long getTotalHits() {
return totalHits;
}

/**
* @return The best bottom sort values consumed so far.
*/
SearchSortValuesAndFormats getBottomSortValues() {
return bottomSortValues;
}

synchronized void consumeTopDocs(TopFieldDocs topDocs, DocValueFormat[] sortValuesFormat) {
totalHits += topDocs.totalHits.value;
if (validateShardSortFields(topDocs.fields) == false) {
return;
}

FieldDoc shardBottomDoc = extractBottom(topDocs);
if (shardBottomDoc == null) {
return;
}
if (bottomSortValues == null
|| compareValues(shardBottomDoc.fields, bottomSortValues.getRawSortValues()) < 0) {
bottomSortValues = new SearchSortValuesAndFormats(shardBottomDoc.fields, sortValuesFormat);
}
}

/**
* @return <code>false</code> if the provided {@link SortField} array differs
* from the initial {@link BottomSortValuesCollector#sortFields}.
*/
private boolean validateShardSortFields(SortField[] shardSortFields) {
for (int i = 0; i < shardSortFields.length; i++) {
if (shardSortFields[i].equals(sortFields[i]) == false) {
// ignore shards response that would make the sort incompatible
// (e.g.: mixing keyword/numeric or long/double).
// TODO: we should fail the entire request because the topdocs
// merge will likely fail later but this is not possible with
// the current async logic that only allows shard failures here.
return false;
}
}
return true;
}

private FieldDoc extractBottom(TopFieldDocs topDocs) {
return topNSize > 0 && topDocs.scoreDocs.length == topNSize ?
(FieldDoc) topDocs.scoreDocs[topNSize-1] : null;
}

private int compareValues(Object[] v1, Object[] v2) {
for (int i = 0; i < v1.length; i++) {
int cmp = reverseMuls[i] * comparators[i].compareValues(v1[i], v2[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
for (SearchPhaseResult entry : queryResults) {
QuerySearchResult result = entry.queryResult();
from = result.from();
size = result.size();
// sorted queries can set the size to 0 if they have enough competitive hits.
size = Math.max(result.size(), size);
if (hasSuggest) {
assert result.suggest() != null;
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
Expand Down Expand Up @@ -725,15 +726,6 @@ int getNumBuffered() {
int getNumReducePhases() { return numReducePhases; }
}

private int resolveTrackTotalHits(SearchRequest request) {
if (request.scroll() != null) {
// no matter what the value of track_total_hits is
return SearchContext.TRACK_TOTAL_HITS_ACCURATE;
}
return request.source() == null ? SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo() == null ?
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : request.source().trackTotalHitsUpTo();
}

/**
* Returns a new ArraySearchPhaseResults instance. This might return an instance that reduces search responses incrementally.
*/
Expand All @@ -744,7 +736,7 @@ ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResults(SearchProgressL
boolean isScrollRequest = request.scroll() != null;
final boolean hasAggs = source != null && source.aggregations() != null;
final boolean hasTopDocs = source == null || source.size() != 0;
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
final int trackTotalHitsUpTo = request.resolveTrackTotalHitsUpTo();
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = requestToAggReduceContextBuilder.apply(request);
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
Expand Down
Loading