Skip to content

Specify reason whenever async search gets cancelled #57761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ public void setExpirationTime(long expirationTimeMillis) {
/**
* Cancels the running task and its children.
*/
public void cancelTask(Runnable runnable) {
public void cancelTask(Runnable runnable, String reason) {
if (isCancelled() == false && isCancelling.compareAndSet(false, true)) {
CancelTasksRequest req = new CancelTasksRequest().setTaskId(searchId.getTaskId());
CancelTasksRequest req = new CancelTasksRequest().setTaskId(searchId.getTaskId()).setReason(reason);
client.admin().cluster().cancelTasks(req, new ActionListener<>() {
@Override
public void onResponse(CancelTasksResponse cancelTasksResponse) {
Expand Down Expand Up @@ -316,8 +316,6 @@ private AsyncSearchResponse getResponseWithHeaders() {
return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
}



// checks if the search task should be cancelled
private synchronized void checkCancellation() {
long now = System.currentTimeMillis();
Expand All @@ -326,7 +324,7 @@ private synchronized void checkCancellation() {
// we cancel the search task if the initial submit task was cancelled,
// this is needed because the task cancellation mechanism doesn't
// handle the cancellation of grand-children.
cancelTask(() -> {});
cancelTask(() -> {}, checkSubmitCancellation.getAsBoolean() ? "submit was cancelled" : "async search has expired");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void cancelTaskAndDeleteResult(AsyncExecutionId searchId, ActionListener<Acknowl
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]", searchId.getEncoded()), exc);
listener.onFailure(exc);
}
})));
})), "cancelled by user");
} else {
// the task was not found (already cancelled, already completed, or invalid id?)
// we fail if the response is not found in the index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void onResponse(AsyncSearchResponse searchResponse) {
// the user cancelled the submit so we don't store anything
// and propagate the failure
Exception cause = new TaskCancelledException(submitTask.getReasonCancelled());
onFatalFailure(searchTask, cause, searchResponse.isRunning(), submitListener);
onFatalFailure(searchTask, cause, searchResponse.isRunning(),
"submit task is cancelled", submitListener);
} else {
final String docId = searchTask.getExecutionId().getDocId();
// creates the fallback response if the node crashes/restarts in the middle of the request
Expand All @@ -117,12 +118,13 @@ public void onResponse(IndexResponse r) {

@Override
public void onFailure(Exception exc) {
onFatalFailure(searchTask, exc, searchResponse.isRunning(), submitListener);
onFatalFailure(searchTask, exc, searchResponse.isRunning(),
"unable to store initial response", submitListener);
}
});
}
} catch (Exception exc) {
onFatalFailure(searchTask, exc, searchResponse.isRunning(), submitListener);
onFatalFailure(searchTask, exc, searchResponse.isRunning(), "generic error", submitListener);
}
} else {
// the task completed within the timeout so the response is sent back to the user
Expand Down Expand Up @@ -157,15 +159,16 @@ public AsyncSearchTask createTask(long id, String type, String action, TaskId pa
return searchRequest;
}

private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, ActionListener<AsyncSearchResponse> listener) {
private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shouldCancel, String cancelReason,
ActionListener<AsyncSearchResponse> listener) {
if (shouldCancel && task.isCancelled() == false) {
task.cancelTask(() -> {
try {
task.addCompletionListener(finalResponse -> taskManager.unregister(task));
} finally {
listener.onFailure(error);
}
});
}, "fatal failure: " + cancelReason);
} else {
try {
task.addCompletionListener(finalResponse -> taskManager.unregister(task));
Expand Down