Skip to content

HLRC support for getTask #35166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Nov 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,38 @@ private <Req, Resp> Resp internalPerformRequest(Req request,
throw new IOException("Unable to parse response body for " + response, e);
}
}

/**
* Defines a helper method for requests that can 404 and in which case will return an empty Optional
* otherwise tries to parse the response body
*/
protected final <Req extends Validatable, Resp> Optional<Resp> performRequestAndParseOptionalEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
RequestOptions options,
CheckedFunction<XContentParser, Resp, IOException> entityParser
) throws IOException {
Optional<ValidationException> validationException = request.validate();
if (validationException != null && validationException.isPresent()) {
throw validationException.get();
}
Request req = requestConverter.apply(request);
req.setOptions(options);
Response response;
try {
response = client.performRequest(req);
} catch (ResponseException e) {
if (RestStatus.NOT_FOUND.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
return Optional.empty();
}
throw parseResponseException(e);
}

try {
return Optional.of(parseEntity(response.getEntity(), entityParser));
} catch (Exception e) {
throw new IOException("Unable to parse response body for " + response, e);
}
}

/**
* @deprecated If creating a new HLRC ReST API call, consider creating new actions instead of reusing server actions. The Validation
Expand Down Expand Up @@ -1538,6 +1570,62 @@ public void onFailure(Exception exception) {
}
};
}

/**
* Async request which returns empty Optionals in the case of 404s or parses entity into an Optional
*/
protected final <Req extends Validatable, Resp> void performRequestAsyncAndParseOptionalEntity(Req request,
CheckedFunction<Req, Request, IOException> requestConverter,
RequestOptions options,
CheckedFunction<XContentParser, Resp, IOException> entityParser,
ActionListener<Optional<Resp>> listener) {
Optional<ValidationException> validationException = request.validate();
if (validationException != null && validationException.isPresent()) {
listener.onFailure(validationException.get());
return;
}
Request req;
try {
req = requestConverter.apply(request);
} catch (Exception e) {
listener.onFailure(e);
return;
}
req.setOptions(options);
ResponseListener responseListener = wrapResponseListener404sOptional(response -> parseEntity(response.getEntity(),
entityParser), listener);
client.performRequestAsync(req, responseListener);
}

final <Resp> ResponseListener wrapResponseListener404sOptional(CheckedFunction<Response, Resp, IOException> responseConverter,
ActionListener<Optional<Resp>> actionListener) {
return new ResponseListener() {
@Override
public void onSuccess(Response response) {
try {
actionListener.onResponse(Optional.of(responseConverter.apply(response)));
} catch (Exception e) {
IOException ioe = new IOException("Unable to parse response body for " + response, e);
onFailure(ioe);
}
}

@Override
public void onFailure(Exception exception) {
if (exception instanceof ResponseException) {
ResponseException responseException = (ResponseException) exception;
Response response = responseException.getResponse();
if (RestStatus.NOT_FOUND.getStatus() == response.getStatusLine().getStatusCode()) {
actionListener.onResponse(Optional.empty());
} else {
actionListener.onFailure(parseResponseException(responseException));
}
} else {
actionListener.onFailure(exception);
}
}
};
}

/**
* Converts a {@link ResponseException} obtained from the low level REST client into an {@link ElasticsearchException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.client.tasks.GetTaskRequest;
import org.elasticsearch.client.tasks.GetTaskResponse;

import java.io.IOException;
import java.util.Optional;

import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -67,6 +70,34 @@ public void listAsync(ListTasksRequest request, RequestOptions options, ActionLi
restHighLevelClient.performRequestAsyncAndParseEntity(request, TasksRequestConverters::listTasks, options,
ListTasksResponse::fromXContent, listener, emptySet());
}

/**
* Get a task using the Task Management API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public Optional<GetTaskResponse> get(GetTaskRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseOptionalEntity(request, TasksRequestConverters::getTask, options,
GetTaskResponse::fromXContent);
}

/**
* Get a task using the Task Management API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener an actionlistener that takes an optional response (404s are returned as an empty Optional)
*/
public void getAsync(GetTaskRequest request, RequestOptions options, ActionListener<Optional<GetTaskResponse>> listener) {

restHighLevelClient.performRequestAsyncAndParseOptionalEntity(request, TasksRequestConverters::getTask, options,
GetTaskResponse::fromXContent, listener);
}

/**
* Cancel one or more cluster tasks using the Task Management API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
import org.elasticsearch.client.tasks.GetTaskRequest;

final class TasksRequestConverters {

Expand Down Expand Up @@ -54,4 +56,16 @@ static Request listTasks(ListTasksRequest listTaskRequest) {
.putParam("group_by", "none");
return request;
}

static Request getTask(GetTaskRequest getTaskRequest) {
String endpoint = new EndpointBuilder().addPathPartAsIs("_tasks")
.addPathPartAsIs(getTaskRequest.getNodeId() + ":" + Long.toString(getTaskRequest.getTaskId()))
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
params.withTimeout(getTaskRequest.getTimeout())
.withWaitForCompletion(getTaskRequest.getWaitForCompletion());
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.tasks;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Objects;
import java.util.Optional;

public class GetTaskRequest implements Validatable {
private final String nodeId;
private final long taskId;
private boolean waitForCompletion = false;
private TimeValue timeout = null;

public GetTaskRequest(String nodeId, long taskId) {
this.nodeId = nodeId;
this.taskId = taskId;
}

public String getNodeId() {
return nodeId;
}

public long getTaskId() {
return taskId;
}

/**
* Should this request wait for all found tasks to complete?
*/
public boolean getWaitForCompletion() {
return waitForCompletion;
}

/**
* Should this request wait for all found tasks to complete?
*/
public GetTaskRequest setWaitForCompletion(boolean waitForCompletion) {
this.waitForCompletion = waitForCompletion;
return this;
}

/**
* Timeout to wait for any async actions this request must take. It must take anywhere from 0 to 2.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im not sure i like these comments.. I know they are in server, but what does "it must take anywhere from 0 to 2" mean...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if there is a upper bound, maybe we should validate it in validate()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy-and-paste from the server equivalent. I'll take a closer look

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment. I don't see any validation code related to it so did not add anything to a validate() method. (General note: if we duplicate validation logic client and server-side is that not trappy code-maintenance-wise?)

*/
public TimeValue getTimeout() {
return timeout;
}

/**
* Timeout to wait for any async actions this request must take.
*/
public GetTaskRequest setTimeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}

@Override
public Optional<ValidationException> validate() {
final ValidationException validationException = new ValidationException();
if (timeout != null && !waitForCompletion) {
validationException.addValidationError("Timeout settings are only accepted if waitForCompletion is also set");
}
if (validationException.validationErrors().isEmpty()) {
return Optional.empty();
}
return Optional.of(validationException);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, taskId, waitForCompletion, timeout);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
GetTaskRequest other = (GetTaskRequest) obj;
return Objects.equals(nodeId, other.nodeId) &&
taskId == other.taskId &&
waitForCompletion == other.waitForCompletion &&
Objects.equals(timeout, other.timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.tasks;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.tasks.TaskInfo;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;

public class GetTaskResponse {
private final boolean completed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spacing is messed up here and below the getters

private final TaskInfo taskInfo;
public static final ParseField COMPLETED = new ParseField("completed");
public static final ParseField TASK = new ParseField("task");

public GetTaskResponse(boolean completed, TaskInfo taskInfo) {
this.completed = completed;
this.taskInfo = taskInfo;
}

public boolean isCompleted() {
return completed;
}

public TaskInfo getTaskInfo() {
return taskInfo;
}

private static final ConstructingObjectParser<GetTaskResponse, Void> PARSER = new ConstructingObjectParser<>("get_task",
true, a -> new GetTaskResponse((boolean) a[0], (TaskInfo) a[1]));
static {
PARSER.declareBoolean(constructorArg(), COMPLETED);
PARSER.declareObject(constructorArg(), (p, c) -> TaskInfo.fromXContent(p), TASK);
}

public static GetTaskResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some minor new lines formatting would be nice to fix

Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ public void testMethodsVisibility() {
"parseResponseException",
"performRequest",
"performRequestAndParseEntity",
"performRequestAndParseOptionalEntity",
"performRequestAsync",
"performRequestAsyncAndParseEntity"};
"performRequestAsyncAndParseEntity",
"performRequestAsyncAndParseOptionalEntity"
};

final Set<String> protectedMethods = Arrays.stream(RestHighLevelClient.class.getDeclaredMethods())
.filter(method -> Modifier.isProtected(method.getModifiers()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -675,8 +676,7 @@ public void testApiNamingConventions() throws Exception {
"indices.put_alias",
"mtermvectors",
"render_search_template",
"scripts_painless_execute",
"tasks.get"
"scripts_painless_execute"
};
//These API are not required for high-level client feature completeness
String[] notRequiredApi = new String[] {
Expand Down Expand Up @@ -777,8 +777,11 @@ private void assertSyncMethod(Method method, String apiName) {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), equalTo("boolean"));
} else {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), endsWith("Response"));
// It's acceptable for 404s to be represented as empty Optionals
if (!method.getReturnType().isAssignableFrom(Optional.class)) {
assertThat("the return type for method [" + method + "] is incorrect",
method.getReturnType().getSimpleName(), endsWith("Response"));
}
}

assertEquals("incorrect number of exceptions for method [" + method + "]", 1, method.getExceptionTypes().length);
Expand Down
Loading