Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to disable chunked transfer-encoding #3864

Merged
merged 12 commits into from
Jul 13, 2022
Merged
142 changes: 130 additions & 12 deletions client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.HttpEntityWrapper;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -131,6 +132,29 @@ public class RestClient implements Closeable {
private volatile NodeTuple<List<Node>> nodeTuple;
private final WarningsHandler warningsHandler;
private final boolean compressionEnabled;
private final Optional<Boolean> chunkedEnabled;

RestClient(
CloseableHttpAsyncClient client,
Header[] defaultHeaders,
List<Node> nodes,
String pathPrefix,
FailureListener failureListener,
NodeSelector nodeSelector,
boolean strictDeprecationMode,
boolean compressionEnabled,
boolean chunkedEnabled
) {
this.client = client;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.chunkedEnabled = Optional.of(chunkedEnabled);
setNodes(nodes);
}

RestClient(
CloseableHttpAsyncClient client,
Expand All @@ -149,6 +173,7 @@ public class RestClient implements Closeable {
this.nodeSelector = nodeSelector;
this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE;
this.compressionEnabled = compressionEnabled;
this.chunkedEnabled = Optional.empty();
setNodes(nodes);
}

Expand Down Expand Up @@ -583,36 +608,42 @@ private static void addSuppressedException(Exception suppressedException, Except
}
}

private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity, boolean compressionEnabled) {
private HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) {
switch (method.toUpperCase(Locale.ROOT)) {
case HttpDeleteWithEntity.METHOD_NAME:
return addRequestBody(new HttpDeleteWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpDeleteWithEntity(uri), entity);
case HttpGetWithEntity.METHOD_NAME:
return addRequestBody(new HttpGetWithEntity(uri), entity, compressionEnabled);
return addRequestBody(new HttpGetWithEntity(uri), entity);
case HttpHead.METHOD_NAME:
return addRequestBody(new HttpHead(uri), entity, compressionEnabled);
return addRequestBody(new HttpHead(uri), entity);
case HttpOptions.METHOD_NAME:
return addRequestBody(new HttpOptions(uri), entity, compressionEnabled);
return addRequestBody(new HttpOptions(uri), entity);
case HttpPatch.METHOD_NAME:
return addRequestBody(new HttpPatch(uri), entity, compressionEnabled);
return addRequestBody(new HttpPatch(uri), entity);
case HttpPost.METHOD_NAME:
HttpPost httpPost = new HttpPost(uri);
addRequestBody(httpPost, entity, compressionEnabled);
addRequestBody(httpPost, entity);
return httpPost;
case HttpPut.METHOD_NAME:
return addRequestBody(new HttpPut(uri), entity, compressionEnabled);
return addRequestBody(new HttpPut(uri), entity);
case HttpTrace.METHOD_NAME:
return addRequestBody(new HttpTrace(uri), entity, compressionEnabled);
return addRequestBody(new HttpTrace(uri), entity);
default:
throw new UnsupportedOperationException("http method not supported: " + method);
}
}

private static HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity, boolean compressionEnabled) {
private HttpRequestBase addRequestBody(HttpRequestBase httpRequest, HttpEntity entity) {
if (entity != null) {
if (httpRequest instanceof HttpEntityEnclosingRequestBase) {
if (compressionEnabled) {
entity = new ContentCompressingEntity(entity);
if (chunkedEnabled.isPresent()) {
entity = new ContentCompressingEntity(entity, chunkedEnabled.get());
} else {
entity = new ContentCompressingEntity(entity);
}
} else if (chunkedEnabled.isPresent()) {
entity = new ContentHttpEntity(entity, chunkedEnabled.get());
}
((HttpEntityEnclosingRequestBase) httpRequest).setEntity(entity);
} else {
Expand Down Expand Up @@ -782,7 +813,7 @@ private class InternalRequest {
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity(), compressionEnabled);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
setRequestConfig(httpRequest, request.getOptions().getRequestConfig());
Expand Down Expand Up @@ -936,6 +967,7 @@ private static Exception extractAndWrapCause(Exception exception) {
* A gzip compressing entity that also implements {@code getContent()}.
*/
public static class ContentCompressingEntity extends GzipCompressingEntity {
private Optional<Boolean> chunkedEnabled;

/**
* Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity.
Expand All @@ -944,6 +976,18 @@ public static class ContentCompressingEntity extends GzipCompressingEntity {
*/
public ContentCompressingEntity(HttpEntity entity) {
super(entity);
this.chunkedEnabled = Optional.empty();
}

/**
* Creates a {@link ContentCompressingEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public ContentCompressingEntity(HttpEntity entity, boolean chunkedEnabled) {
super(entity);
this.chunkedEnabled = Optional.of(chunkedEnabled);
}

@Override
Expand All @@ -954,6 +998,80 @@ public InputStream getContent() throws IOException {
}
return out.asInput();
}

/**
* A gzip compressing entity doesn't work with chunked encoding with sigv4
*
* @return false
*/
@Override
public boolean isChunked() {
return chunkedEnabled.orElseGet(super::isChunked);
}

/**
* A gzip entity requires content length in http headers.
*
* @return content length of gzip entity
*/
@Override
public long getContentLength() {
if (chunkedEnabled.isPresent()) {
if (chunkedEnabled.get()) {
return -1L;
} else {
long size;
try (InputStream is = getContent()) {
size = is.readAllBytes().length;
} catch (IOException ex) {
size = -1L;
}

return size;
}
} else {
return super.getContentLength();
}
}
}

/**
* An entity that lets the caller specify the return value of {@code isChunked()}.
*/
public static class ContentHttpEntity extends HttpEntityWrapper {
private Optional<Boolean> chunkedEnabled;

/**
* Creates a {@link ContentHttpEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
*/
public ContentHttpEntity(HttpEntity entity) {
super(entity);
this.chunkedEnabled = Optional.empty();
}

/**
* Creates a {@link ContentHttpEntity} instance with the provided HTTP entity.
*
* @param entity the HTTP entity.
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public ContentHttpEntity(HttpEntity entity, boolean chunkedEnabled) {
super(entity);
this.chunkedEnabled = Optional.of(chunkedEnabled);
}

/**
* A chunked entity requires transfer-encoding:chunked in http headers
* which requires isChunked to be true
*
* @return true
*/
@Override
public boolean isChunked() {
return chunkedEnabled.orElseGet(super::isChunked);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.security.PrivilegedAction;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Helps creating a new {@link RestClient}. Allows to set the most common http client configuration options when internally
Expand Down Expand Up @@ -84,6 +85,7 @@ public final class RestClientBuilder {
private NodeSelector nodeSelector = NodeSelector.ANY;
private boolean strictDeprecationMode = false;
private boolean compressionEnabled = false;
private Optional<Boolean> chunkedEnabled;

/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
Expand All @@ -100,6 +102,7 @@ public final class RestClientBuilder {
}
}
this.nodes = nodes;
this.chunkedEnabled = Optional.empty();
}

/**
Expand Down Expand Up @@ -238,6 +241,16 @@ public RestClientBuilder setCompressionEnabled(boolean compressionEnabled) {
return this;
}

/**
* Whether the REST client should use Transfer-Encoding: chunked for requests or not"
*
* @param chunkedEnabled force enable/disable chunked transfer-encoding.
*/
public RestClientBuilder setChunkedEnabled(boolean chunkedEnabled) {
this.chunkedEnabled = Optional.of(chunkedEnabled);
return this;
}

/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
Expand All @@ -248,16 +261,34 @@ public RestClient build() {
CloseableHttpAsyncClient httpClient = AccessController.doPrivileged(
(PrivilegedAction<CloseableHttpAsyncClient>) this::createHttpClient
);
RestClient restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled
);

RestClient restClient = null;

if (chunkedEnabled.isPresent()) {
restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled,
chunkedEnabled.get()
);
} else {
restClient = new RestClient(
httpClient,
defaultHeaders,
nodes,
pathPrefix,
failureListener,
nodeSelector,
strictDeprecationMode,
compressionEnabled
);
}

httpClient.start();
return restClient;
}
Expand Down
Loading