Skip to content

Introduce batched query execution and data-node side reduce (#121885) #126563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121885.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121885
summary: Introduce batched query execution and data-node side reduce
area: Search
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/126385.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 126385
summary: Filter out empty top docs results before merging
area: Search
type: bug
issues:
- 126118
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
Expand All @@ -24,6 +25,7 @@
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;

Expand All @@ -50,6 +52,13 @@ public static void setDebugLogLevel() {
@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

private void setupIndexWithDocs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
setup:
- skip:
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"

- do:
indices.create:
index: test_1
Expand Down Expand Up @@ -48,7 +51,6 @@ setup:
batched_reduce_size: 2
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }

- match: { num_reduce_phases: 4 }
- match: { hits.total: 3 }
- length: { aggregations.str_terms.buckets: 2 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,11 +574,8 @@ public void testSearchQueryThenFetch() throws Exception {
);

clearInterceptedActions();
assertIndicesSubset(
Arrays.asList(searchRequest.indices()),
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), true, SearchTransportService.QUERY_ACTION_NAME);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.FETCH_ID_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
Expand Down Expand Up @@ -631,10 +628,6 @@ private static void assertIndicesSubset(List<String> indices, String... actions)
assertIndicesSubset(indices, false, actions);
}

private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
assertIndicesSubset(indices, true, actions);
}

private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -352,6 +353,8 @@ public void testTransportBulkTasks() {
}

public void testSearchTaskDescriptions() {
// TODO: enhance this test to also check the tasks created by batched query execution
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
createIndex("test");
Expand Down Expand Up @@ -398,7 +401,7 @@ public void testSearchTaskDescriptions() {
// assert that all task descriptions have non-zero length
assertThat(taskInfo.description().length(), greaterThan(0));
}

updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

public void testSearchTaskHeaderLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -445,6 +446,7 @@ public void testSearchIdle() throws Exception {
}

public void testCircuitBreakerReduceFail() throws Exception {
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);

Expand Down Expand Up @@ -518,7 +520,9 @@ public void onFailure(Exception exc) {
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
} finally {
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
updateClusterSettings(
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
Expand Down Expand Up @@ -239,6 +240,8 @@ public void testCancelMultiSearch() throws Exception {
}

public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
// are limited to 1
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -50,6 +53,18 @@ public static String randomExecutionHint() {

private static int numRoutingValues;

@Before
public void disableBatchedExecution() {
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ static TransportVersion def(int id) {
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS_8_19 = def(8_841_0_17);
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG_8_19 = def(8_841_0_18);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION_BACKPORT_8_X = def(8_841_0_19);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,33 +65,33 @@
* distributed frequencies
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
private final Logger logger;
private final NamedWriteableRegistry namedWriteableRegistry;
private final SearchTransportService searchTransportService;
protected final SearchTransportService searchTransportService;
private final Executor executor;
private final ActionListener<SearchResponse> listener;
private final SearchRequest request;
protected final SearchRequest request;

/**
* Used by subclasses to resolve node ids to DiscoveryNodes.
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
protected final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final TransportVersion minTransportVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
protected final Map<String, AliasFilter> aliasFilter;
protected final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
protected final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
protected final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -231,10 +231,17 @@ protected final void run() {
onPhaseDone();
return;
}
if (shardsIts.isEmpty()) {
return;
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
doRun(shardIndexMap);
}

protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
doCheckNoMissingShards(getName(), request, shardsIts);
Version version = request.minCompatibleShardNode();
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
Expand Down Expand Up @@ -275,7 +282,7 @@ private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
return true;
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
shard.getNodeId(),
Expand Down Expand Up @@ -315,7 +322,7 @@ public void onFailure(Exception e) {
executePhaseOnShard(shardIt, connection, shardListener);
}

private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
protected final void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
Expand Down Expand Up @@ -422,7 +429,7 @@ private ShardSearchFailure[] buildShardFailures() {
return failures;
}

private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
protected final void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void onFailure(Exception e) {
}
}

private record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}
public record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}

private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<SearchShardIterator>> entry) {
final SearchShardIterator first = entry.getValue().get(0);
Expand Down
Loading