Skip to content

Commit

Permalink
Part 1: Support for cancel_after_timeinterval parameter in search and…
Browse files Browse the repository at this point in the history
… msearch request (opensearch-project#986)

* Part 1: Support for cancel_after_timeinterval parameter in search and msearch request

This commit introduces the new request level parameter to configure the timeout interval after which
a search request will be cancelled. For msearch request the parameter is supported both at parent
request and at sub child search requests. If it is provided at parent level and child search request
doesn't have it then the parent level value is set at such child request. The parent level msearch
is not used to cancel the parent request as it may be tricky to come up with correct value in cases
when child search request can have different runtimes

TEST: Added test for ser/de with new parameter

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Part 2: Support for cancel_after_timeinterval parameter in search and msearch request

This commit adds the handling of the new request level parameter and schedule cancellation task. It
also adds a cluster setting to set a global cancellation timeout for search request which will be
used in absence of request level timeout.

TEST: Added new tests in SearchCancellationIT
Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Address Review feedback for Part 1

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Address review feedback for Part 2

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Update CancellableTask to remove the cancelOnTimeout boolean flag

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Replace search.cancellation.timeout cluster setting with search.enforce_server.timeout.cancellation to control if cluster level cancel_after_time_interval should take precedence over request level cancel_after_time_interval value

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Removing the search.enforce_server.timeout.cancellation cluster setting and just keeping search.cancel_after_time_interval setting with request level parameter taking the precedence.

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

Co-authored-by: Sorabh Hamirwasia <hsorabh@amazon.com>
  • Loading branch information
sohami and Sorabh Hamirwasia committed Aug 12, 2021
1 parent 987bfcf commit 70eabe5
Show file tree
Hide file tree
Showing 17 changed files with 590 additions and 17 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static org.opensearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue;
import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringValue;
import static org.opensearch.common.xcontent.support.XContentMapValues.nodeTimeValue;

/**
* A multi search API request.
Expand Down Expand Up @@ -272,6 +273,9 @@ public static void readMultiLineFormat(BytesReference data,
allowNoIndices = value;
} else if ("ignore_throttled".equals(entry.getKey()) || "ignoreThrottled".equals(entry.getKey())) {
ignoreThrottled = value;
} else if ("cancel_after_time_interval".equals(entry.getKey()) ||
"cancelAfterTimeInterval".equals(entry.getKey())) {
searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null));
} else {
throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section");
}
Expand Down Expand Up @@ -362,6 +366,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild
if (request.allowPartialSearchResults() != null) {
xContentBuilder.field("allow_partial_search_results", request.allowPartialSearchResults());
}
if (request.getCancelAfterTimeInterval() != null) {
xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep());
}
xContentBuilder.endObject();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.action.search;

import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
Expand Down Expand Up @@ -114,6 +115,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla

private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;

private TimeValue cancelAfterTimeInterval;

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
Expand Down Expand Up @@ -191,6 +194,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
this.localClusterAlias = localClusterAlias;
this.absoluteStartMillis = absoluteStartMillis;
this.finalReduce = finalReduce;
this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval;
}

/**
Expand Down Expand Up @@ -237,6 +241,10 @@ public SearchRequest(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) {
ccsMinimizeRoundtrips = in.readBoolean();
}

if (in.getVersion().onOrAfter(Version.V_1_1_0)) {
cancelAfterTimeInterval = in.readOptionalTimeValue();
}
}

@Override
Expand Down Expand Up @@ -271,6 +279,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) {
out.writeBoolean(ccsMinimizeRoundtrips);
}

if (out.getVersion().onOrAfter(Version.V_1_1_0)) {
out.writeOptionalTimeValue(cancelAfterTimeInterval);
}
}

@Override
Expand Down Expand Up @@ -669,9 +681,17 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s
SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo();
}

public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
this.cancelAfterTimeInterval = cancelAfterTimeInterval;
}

public TimeValue getCancelAfterTimeInterval() {
return cancelAfterTimeInterval;
}

@Override
public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers);
return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval);
}

public final String buildDescription() {
Expand Down Expand Up @@ -718,14 +738,15 @@ public boolean equals(Object o) {
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis &&
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips;
ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips &&
Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval);
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, cancelAfterTimeInterval);
}

@Override
Expand All @@ -746,6 +767,7 @@ public String toString() {
", localClusterAlias=" + localClusterAlias +
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips +
", source=" + source + '}';
", source=" + source +
", cancelAfterTimeInterval=" + cancelAfterTimeInterval + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,4 +626,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) {
this.request.setPreFilterShardSize(preFilterShardSize);
return this;
}

/**
* Request level time interval to control how long search is allowed to execute after which it is cancelled.
*/
public SearchRequestBuilder setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) {
this.request.setCancelAfterTimeInterval(cancelAfterTimeInterval);
return this;
}
}
14 changes: 11 additions & 3 deletions server/src/main/java/org/opensearch/action/search/SearchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@

package org.opensearch.action.search;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.TaskId;

import java.util.Map;
import java.util.function.Supplier;

import static org.opensearch.search.SearchService.NO_TIMEOUT;

/**
* Task storing information about a currently running {@link SearchRequest}.
*/
Expand All @@ -46,9 +49,14 @@ public class SearchTask extends CancellableTask {
private final Supplier<String> descriptionSupplier;
private SearchProgressListener progressListener = SearchProgressListener.NOOP;

public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier,
TaskId parentTaskId, Map<String, String> headers) {
super(id, type, action, null, parentTaskId, headers);
public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier, TaskId parentTaskId,
Map<String, String> headers) {
this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT);
}

public SearchTask(long id, String type, String action, Supplier<String> descriptionSupplier, TaskId parentTaskId,
Map<String, String> headers, TimeValue cancelAfterTimeInterval) {
super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval);
this.descriptionSupplier = descriptionSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.TimeoutTaskCancellationUtility;
import org.opensearch.client.Client;
import org.opensearch.client.OriginSettingClient;
import org.opensearch.client.node.NodeClient;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.ProfileShardResult;
import org.opensearch.search.profile.SearchProfileShardResults;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -121,6 +123,13 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
public static final Setting<Long> SHARD_COUNT_LIMIT_SETTING = Setting.longSetting(
"action.search.shard_count.limit", Long.MAX_VALUE, 1L, Property.Dynamic, Property.NodeScope);

// cluster level setting for timeout based search cancellation. If search request level parameter is present then that will take
// precedence over the cluster setting value
public static final String SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY = "search.cancel_after_time_interval";
public static final Setting<TimeValue> SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING =
Setting.timeSetting(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, SearchService.NO_TIMEOUT, Setting.Property.Dynamic,
Setting.Property.NodeScope);

private final NodeClient client;
private final ThreadPool threadPool;
private final ClusterService clusterService;
Expand Down Expand Up @@ -239,6 +248,14 @@ long buildTookInMillis() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// only if task is of type CancellableTask and support cancellation on timeout, treat this request eligible for timeout based
// cancellation. There may be other top level requests like AsyncSearch which is using SearchRequest internally and has it's own
// cancellation mechanism. For such cases, the SearchRequest when created can override the createTask and set the
// cancelAfterTimeInterval to NO_TIMEOUT and bypass this mechanism
if (task instanceof CancellableTask) {
listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(client, (CancellableTask) task,
clusterService.getClusterSettings(), listener);
}
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.opensearch.client.OriginSettingClient;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchService;
import org.opensearch.tasks.CancellableTask;
import org.opensearch.tasks.TaskId;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN;
import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING;

public class TimeoutTaskCancellationUtility {

private static final Logger logger = LogManager.getLogger(TimeoutTaskCancellationUtility.class);

/**
* Wraps a listener with a timeout listener {@link TimeoutRunnableListener} to schedule the task cancellation for provided tasks on
* generic thread pool
* @param client - {@link NodeClient}
* @param taskToCancel - task to schedule cancellation for
* @param clusterSettings - {@link ClusterSettings}
* @param listener - original listener associated with the task
* @return wrapped listener
*/
public static <Response> ActionListener<Response> wrapWithCancellationListener(NodeClient client, CancellableTask taskToCancel,
ClusterSettings clusterSettings, ActionListener<Response> listener) {
final TimeValue globalTimeout = clusterSettings.get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING);
final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout
: taskToCancel.getCancellationTimeout();
// Note: -1 (or no timeout) will help to turn off cancellation. The combinations will be request level set at -1 or request level
// set to null and cluster level set to -1.
ActionListener<Response> listenerToReturn = listener;
if (timeoutInterval.equals(SearchService.NO_TIMEOUT)) {
return listenerToReturn;
}

try {
final TimeoutRunnableListener<Response> wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> {
final CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
cancelTasksRequest.setTaskId(new TaskId(client.getLocalNodeId(), taskToCancel.getId()));
cancelTasksRequest.setReason("Cancellation timeout of " + timeoutInterval + " is expired");
// force the origin to execute the cancellation as a system user
new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster()
.cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> logger.debug(
"Scheduled cancel task with timeout: {} for original task: {} is successfully completed", timeoutInterval,
cancelTasksRequest.getTaskId()),
e -> logger.error(new ParameterizedMessage("Scheduled cancel task with timeout: {} for original task: {} is failed",
timeoutInterval, cancelTasksRequest.getTaskId()), e))
);
});
wrappedListener.cancellable = client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC);
listenerToReturn = wrappedListener;
} catch (Exception ex) {
// if there is any exception in scheduling the cancellation task then continue without it
logger.warn("Failed to schedule the cancellation task for original task: {}, will continue without it", taskToCancel.getId());
}
return listenerToReturn;
}

/**
* Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received.
* If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to
* the original listener.
*/
private static class TimeoutRunnableListener<Response> implements ActionListener<Response>, Runnable {

private static final Logger logger = LogManager.getLogger(TimeoutRunnableListener.class);

// Runnable to execute after timeout
private final TimeValue timeout;
private final ActionListener<Response> originalListener;
private final Runnable timeoutRunnable;
private final AtomicBoolean executeRunnable = new AtomicBoolean(true);
private volatile Scheduler.ScheduledCancellable cancellable;
private final long creationTime;

TimeoutRunnableListener(TimeValue timeout, ActionListener<Response> listener, Runnable runAfterTimeout) {
this.timeout = timeout;
this.originalListener = listener;
this.timeoutRunnable = runAfterTimeout;
this.creationTime = System.nanoTime();
}

@Override public void onResponse(Response response) {
checkAndCancel();
originalListener.onResponse(response);
}

@Override public void onFailure(Exception e) {
checkAndCancel();
originalListener.onFailure(e);
}

@Override public void run() {
try {
if (executeRunnable.compareAndSet(true, false)) {
timeoutRunnable.run();
} // else do nothing since either response/failure is already sent to client
} catch (Exception ex) {
// ignore the exception
logger.error(new ParameterizedMessage("Ignoring the failure to run the provided runnable after timeout of {} with " +
"exception", timeout), ex);
}
}

private void checkAndCancel() {
if (executeRunnable.compareAndSet(true, false)) {
logger.debug("Aborting the scheduled cancel task after {}",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTime));
// timer has not yet expired so cancel it
cancellable.cancel();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS,
ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
TransportSearchAction.SHARD_COUNT_LIMIT_SETTING,
TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING,
RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE,
RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE,
SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContent;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -158,6 +159,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest,
multiRequest.add(searchRequest);
});
List<SearchRequest> requests = multiRequest.requests();
final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null);
for (SearchRequest request : requests) {
// preserve if it's set on the request
if (preFilterShardSize != null && request.getPreFilterShardSize() == null) {
Expand All @@ -166,6 +168,11 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest,
if (maxConcurrentShardRequests != null) {
request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
// if cancel_after_time_interval parameter is set at per search request level than that is used otherwise one set at
// multi search request level will be used
if (request.getCancelAfterTimeInterval() == null) {
request.setCancelAfterTimeInterval(cancelAfterTimeInterval);
}
}
return multiRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
searchRequest.setCcsMinimizeRoundtrips(
request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips()));
}

searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null));
}

/**
Expand Down
Loading

0 comments on commit 70eabe5

Please sign in to comment.