Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Raise errors for HTTP error codes in the generic client #929

Merged
merged 2 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ This section is for maintaining a changelog for all breaking changes for the cli

### Added
- Add missed fields to MultisearchBody: seqNoPrimaryTerm, storedFields, explain, fields, indicesBoost ([#914](https://github.com/opensearch-project/opensearch-java/pull/914))
- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910))
- Add OpenSearchGenericClient with support for raw HTTP request/responses ([#910](https://github.com/opensearch-project/opensearch-java/pull/910), [#929](https://github.com/opensearch-project/opensearch-java/pull/929))
- Add missed fields to MultisearchBody: collapse, version, timeout ([#916](https://github.com/opensearch-project/opensearch-java/pull/916)
- Add missed fields to MultisearchBody: ext, rescore and to SearchRequest: ext ([#918](https://github.com/opensearch-project/opensearch-java/pull/918)

Expand Down
6 changes: 6 additions & 0 deletions guides/generic.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ The following sample code gets the `OpenSearchGenericClient` from the `OpenSearc
final OpenSearchGenericClient generic = javaClient().generic();
```

The generic client with default options (`ClientOptions.DEFAULT`) returns the responses as those were received from the server. The generic client could be instructed to raise an `OpenSearchClientException` exception instead if the HTTP status code is not indicating the successful response, for example:

```java
final OpenSearchGenericClient generic = javaClient().generic().witClientOptions(ClientOptions.throwOnHttpErrors());
```

## Sending Simple Request
The following sample code sends a simple request that does not require any payload to be provided (typically, `GET` requests).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public interface Body extends AutoCloseable {
* @return body as {@link String}
*/
default String bodyAsString() {
return new String(bodyAsBytes(), StandardCharsets.UTF_8);
}

/**
* Gets the body as {@link byte[]}
* @return body as {@link byte[]}
*/
default byte[] bodyAsBytes() {
try (final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (final InputStream in = body()) {
final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
Expand All @@ -77,7 +85,7 @@ default String bodyAsString() {
}

out.flush();
return new String(out.toByteArray(), StandardCharsets.UTF_8);
return out.toByteArray();
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ final class GenericResponse implements Response {
private final Collection<Map.Entry<String, String>> headers;
private final Body body;

GenericResponse(String uri, String protocol, String method, int status, String reason, Collection<Map.Entry<String, String>> headers) {
this(uri, protocol, method, status, reason, headers, null);
}

GenericResponse(
String uri,
String protocol,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.opensearch.generic;

/**
* Exception thrown by API client methods when OpenSearch could not accept or
* process a request.
* <p>
* The {@link #response()} contains the the raw response as returned by the API
* endpoint that was called.
*/
public class OpenSearchClientException extends RuntimeException {

private final Response response;

public OpenSearchClientException(Response response) {
super("Request failed: [" + response.getStatus() + "] " + response.getReason());
this.response = response;
}

/**
* The error response sent by OpenSearch
*/
public Response response() {
return this.response;
}

/**
* Status code returned by OpenSearch. Shortcut for
* {@code response().status()}.
*/
public int status() {
return this.response.getStatus();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.opensearch.client.ApiClient;
Expand All @@ -24,14 +26,37 @@
* Client for the generic HTTP requests.
*/
public class OpenSearchGenericClient extends ApiClient<OpenSearchTransport, OpenSearchGenericClient> {
/**
* Generic client options
*/
public static final class ClientOptions {
private static final ClientOptions DEFAULT = new ClientOptions();

private final Predicate<Integer> error;

private ClientOptions() {
this(statusCode -> false);
}

private ClientOptions(final Predicate<Integer> error) {
this.error = error;
}

public static ClientOptions throwOnHttpErrors() {
return new ClientOptions(statusCode -> statusCode >= 400);
}
}

/**
* Generic endpoint instance
*/
private static final class GenericEndpoint implements org.opensearch.client.transport.GenericEndpoint<Request, Response> {
private final Request request;
private final Predicate<Integer> error;

public GenericEndpoint(Request request) {
public GenericEndpoint(Request request, Predicate<Integer> error) {
this.request = request;
this.error = error;
}

@Override
Expand Down Expand Up @@ -67,24 +92,70 @@ public GenericResponse responseDeserializer(
int status,
String reason,
List<Entry<String, String>> headers,
String contentType,
InputStream body
@Nullable String contentType,
@Nullable InputStream body
) {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
if (isError(status)) {
// Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed
try (Body b = Body.from(body, contentType)) {
if (b != null) {
return new GenericResponse(
uri,
protocol,
method,
status,
reason,
headers,
Body.from(b.bodyAsBytes(), b.contentType())
);
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers);
}
} catch (final IOException ex) {
throw new UncheckedIOException(ex);
}
} else {
return new GenericResponse(uri, protocol, method, status, reason, headers, Body.from(body, contentType));
}
}

@Override
public boolean isError(int statusCode) {
return error.test(statusCode);
}

@Override
public <T extends RuntimeException> T exceptionConverter(int statusCode, @Nullable Response error) {
throw new OpenSearchClientException(error);
}
}

private final ClientOptions clientOptions;

public OpenSearchGenericClient(OpenSearchTransport transport) {
super(transport, null);
this(transport, null, ClientOptions.DEFAULT);
}

public OpenSearchGenericClient(OpenSearchTransport transport, @Nullable TransportOptions transportOptions) {
this(transport, transportOptions, ClientOptions.DEFAULT);
}

public OpenSearchGenericClient(
OpenSearchTransport transport,
@Nullable TransportOptions transportOptions,
ClientOptions clientOptions
) {
super(transport, transportOptions);
this.clientOptions = clientOptions;
}

public OpenSearchGenericClient withClientOptions(ClientOptions clientOptions) {
return new OpenSearchGenericClient(this.transport, this.transportOptions, clientOptions);
}

@Override
public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions transportOptions) {
return new OpenSearchGenericClient(this.transport, transportOptions);
return new OpenSearchGenericClient(this.transport, transportOptions, this.clientOptions);
}

/**
Expand All @@ -94,7 +165,7 @@ public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions t
* @throws IOException I/O exception
*/
public Response execute(Request request) throws IOException {
return transport.performRequest(request, new GenericEndpoint(request), this.transportOptions);
return transport.performRequest(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions);
}

/**
Expand All @@ -103,6 +174,6 @@ public Response execute(Request request) throws IOException {
* @return generic HTTP response future
*/
public CompletableFuture<Response> executeAsync(Request request) {
return transport.performRequestAsync(request, new GenericEndpoint(request), this.transportOptions);
return transport.performRequestAsync(request, new GenericEndpoint(request, clientOptions.error), this.transportOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import javax.annotation.Nullable;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;

/**
* An endpoint links requests and responses to HTTP protocol encoding. It also defines the error response
Expand Down Expand Up @@ -90,4 +92,13 @@ default Map<String, String> headers(RequestT request) {
@Nullable
JsonpDeserializer<ErrorT> errorDeserializer(int statusCode);

/**
* Converts error response to exception instance of type {@code T}
* @param <T> exception type
* @param error error response
* @return exception instance
*/
default <T extends RuntimeException> T exceptionConverter(int statusCode, @Nullable ErrorT error) {
throw new OpenSearchException((ErrorResponse) error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.io.entity.BufferedHttpEntity;
import org.apache.hc.core5.http.io.entity.ByteArrayEntity;
import org.apache.hc.core5.http.io.entity.EntityUtils;
Expand All @@ -76,8 +77,6 @@
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.NdJsonpSerializable;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.GenericEndpoint;
import org.opensearch.client.transport.GenericSerializable;
Expand Down Expand Up @@ -485,36 +484,68 @@ private <ResponseT, ErrorT> ResponseT prepareResponse(Response clientResp, Endpo

try {
int statusCode = clientResp.getStatusLine().getStatusCode();

if (endpoint.isError(statusCode)) {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null) {
throw new TransportException("Request failed with status code '" + statusCode + "'", new ResponseException(clientResp));
}

if (statusCode == HttpStatus.SC_FORBIDDEN) {
throw new TransportException("Forbidden access", new ResponseException(clientResp));
} else if (statusCode == HttpStatus.SC_UNAUTHORIZED) {
throw new TransportException("Unauthorized access", new ResponseException(clientResp));
} else if (endpoint.isError(statusCode)) {
HttpEntity entity = clientResp.getEntity();
if (entity == null) {
throw new TransportException("Expecting a response body, but none was sent", new ResponseException(clientResp));
}

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try {
InputStream content = entity.getContent();
try (JsonParser parser = mapper.jsonProvider().createParser(content)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
// TODO: have the endpoint provide the exception constructor
throw new OpenSearchException((ErrorResponse) error);
if (endpoint instanceof GenericEndpoint) {
@SuppressWarnings("unchecked")
final GenericEndpoint<?, ResponseT> rawEndpoint = (GenericEndpoint<?, ResponseT>) endpoint;

final RequestLine requestLine = clientResp.getRequestLine();
final StatusLine statusLine = clientResp.getStatusLine();

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try (InputStream content = entity.getContent()) {
final ResponseT error = rawEndpoint.responseDeserializer(
requestLine.getUri(),
requestLine.getMethod(),
requestLine.getProtocolVersion().format(),
statusLine.getStatusCode(),
statusLine.getReasonPhrase(),
Arrays.stream(clientResp.getHeaders())
.map(h -> new AbstractMap.SimpleEntry<String, String>(h.getName(), h.getValue()))
.collect(Collectors.toList()),
entity.getContentType(),
content
);
throw rawEndpoint.exceptionConverter(statusCode, error);
}
} catch (MissingRequiredPropertyException errorEx) {
// Could not decode exception, try the response type
} else {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null) {
throw new TransportException(
"Request failed with status code '" + statusCode + "'",
new ResponseException(clientResp)
);
}

// We may have to replay it.
entity = new BufferedHttpEntity(entity);

try {
ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint);
return response;
} catch (Exception respEx) {
// No better luck: throw the original error decoding exception
throw new TransportException("Failed to decode error response", new ResponseException(clientResp));
InputStream content = entity.getContent();
try (JsonParser parser = mapper.jsonProvider().createParser(content)) {
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw endpoint.exceptionConverter(statusCode, error);
}
} catch (MissingRequiredPropertyException errorEx) {
// Could not decode exception, try the response type
try {
ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint);
return response;
} catch (Exception respEx) {
// No better luck: throw the original error decoding exception
throw new TransportException("Failed to decode error response", new ResponseException(clientResp));
}
}
}
} else {
Expand Down
Loading
Loading