Skip to content

Commit

Permalink
fix for bug: #3640
Browse files Browse the repository at this point in the history
Signed-off-by: Jitendra Kumar <jkjitend@amazon.com>
  • Loading branch information
kumjiten committed Jun 23, 2022
1 parent 7005b9e commit 45ec5ed
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 0 deletions.
35 changes: 35 additions & 0 deletions client/rest/src/main/java/org/opensearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,41 @@ public InputStream getContent() throws IOException {
}
return out.asInput();
}

/**
* A gzip compressing enrity doesn't worked with chunked encoding with sigv4
*
* @return false
*/
@Override
public boolean isChunked() {
return false;
}

/**
* A gzip entity require to content length in http headers
* as it doesn't work with chunked encoding for sigv4
*
* @return content lenght of gzip entity
*/
@Override
public long getContentLength() {
long size = 0;
int chunk = 0;
byte[] buffer = new byte[1024];

try {
InputStream is = getContent();

while ((chunk = is.read(buffer)) != -1) {
size += chunk;
}
} catch (Exception ex) {
throw new RuntimeException("failed to get compressed content lenght: " + ex.getMessage());
}

return size;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.client;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class RestClientCompressionTests extends RestClientTestCase {

private static HttpServer httpServer;

@BeforeClass
public static void startHttpServer() throws Exception {
httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.createContext("/", new GzipResponseHandler());
httpServer.start();
}

@AfterClass
public static void stopHttpServers() throws IOException {
httpServer.stop(0);
httpServer = null;
}

/**
* A response handler that accepts gzip-encoded data and replies request and response encoding values
* followed by the request body. The response is compressed if "Accept-Encoding" is "gzip".
*/
private static class GzipResponseHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {

// Decode body (if any)
String contentEncoding = exchange.getRequestHeaders().getFirst("Content-Encoding");
String contentLength = exchange.getRequestHeaders().getFirst("Content-Length");
InputStream body = exchange.getRequestBody();
boolean compressedRequest = false;
if ("gzip".equals(contentEncoding)) {
body = new GZIPInputStream(body);
compressedRequest = true;
}
byte[] bytes = readAll(body);
boolean compress = "gzip".equals(exchange.getRequestHeaders().getFirst("Accept-Encoding"));
if (compress) {
exchange.getResponseHeaders().add("Content-Encoding", "gzip");
}

exchange.sendResponseHeaders(200, 0);

// Encode response if needed
OutputStream out = exchange.getResponseBody();
if (compress) {
out = new GZIPOutputStream(out);
}

// Outputs <request-encoding|null>#<response-encoding|null>#<request-body>
out.write(String.valueOf(contentEncoding).getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write((compress ? "gzip" : "null").getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write((compressedRequest ? contentLength : "null").getBytes(StandardCharsets.UTF_8));
out.write('#');
out.write(bytes);
out.close();

exchange.close();
}
}

/** Read all bytes of an input stream and close it. */
private static byte[] readAll(InputStream in) throws IOException {
byte[] buffer = new byte[1024];
ByteArrayOutputStream bos = new ByteArrayOutputStream();
int len = 0;
while ((len = in.read(buffer)) > 0) {
bos.write(buffer, 0, len);
}
in.close();
return bos.toByteArray();
}

private RestClient createClient(boolean enableCompression) {
InetSocketAddress address = httpServer.getAddress();
return RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(enableCompression)
.build();
}

public void testCompressingClientWithContentLengthSync() throws Exception {
RestClient restClient = createClient(true);

Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));

Response response = restClient.performRequest(request);

HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);
// Content-Encoding#Accept-Encoding#Content-Length#Content
Assert.assertEquals("gzip#gzip#38#compressing client", content);

restClient.close();
}

public void testCompressingClientContentLengthAsync() throws Exception {
InetSocketAddress address = httpServer.getAddress();
RestClient restClient = RestClient.builder(new HttpHost(address.getHostString(), address.getPort(), "http"))
.setCompressionEnabled(true)
.build();

Request request = new Request("POST", "/");
request.setEntity(new StringEntity("compressing client", ContentType.TEXT_PLAIN));

FutureResponse futureResponse = new FutureResponse();
restClient.performRequestAsync(request, futureResponse);
Response response = futureResponse.get();

// Server should report it had a compressed request and sent back a compressed response
HttpEntity entity = response.getEntity();
String content = new String(readAll(entity.getContent()), StandardCharsets.UTF_8);

// Content-Encoding#Accept-Encoding#Content-Length#Content
Assert.assertEquals("gzip#gzip#38#compressing client", content);

restClient.close();
}

public static class FutureResponse extends CompletableFuture<Response> implements ResponseListener {
@Override
public void onSuccess(Response response) {
this.complete(response);
}

@Override
public void onFailure(Exception exception) {
this.completeExceptionally(exception);
}
}
}

0 comments on commit 45ec5ed

Please sign in to comment.