Skip to content

Commit 005f209

Browse files
committed
Revert "Compress async search responses before storing (elastic#74766)"
This reverts commit 55175de.
1 parent 55175de commit 005f209

File tree

3 files changed

+11
-36
lines changed

3 files changed

+11
-36
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.TriFunction;
2929
import org.elasticsearch.common.breaker.CircuitBreaker;
3030
import org.elasticsearch.common.bytes.BytesReference;
31-
import org.elasticsearch.common.compress.CompressorFactory;
3231
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
3332
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
3433
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -60,7 +59,6 @@
6059
import org.elasticsearch.xpack.core.security.authc.Authentication;
6160
import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
6261

63-
import java.io.FilterOutputStream;
6462
import java.io.IOException;
6563
import java.io.InputStream;
6664
import java.io.OutputStream;
@@ -584,28 +582,18 @@ boolean ensureAuthenticatedUserIsSame(Map<String, String> originHeaders, Authent
584582
}
585583

586584
private void writeResponse(R response, OutputStream os) throws IOException {
587-
os = new FilterOutputStream(os) {
588-
@Override
589-
public void close() {
590-
// do not close the output
591-
}
592-
};
593585
final Version minNodeVersion = clusterService.state().nodes().getMinNodeVersion();
594-
Version.writeVersion(minNodeVersion, new OutputStreamStreamOutput(os));
595-
if (minNodeVersion.onOrAfter(Version.V_8_0_0)) {
596-
os = CompressorFactory.COMPRESSOR.threadLocalOutputStream(os);
597-
}
598-
try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(os)) {
599-
out.setVersion(minNodeVersion);
600-
response.writeTo(out);
601-
}
586+
final OutputStreamStreamOutput out = new OutputStreamStreamOutput(os);
587+
out.setVersion(minNodeVersion);
588+
Version.writeVersion(minNodeVersion, out);
589+
response.writeTo(out);
602590
}
603591

604592
/**
605593
* Decode the provided base-64 bytes into a {@link AsyncSearchResponse}.
606594
*/
607595
private R decodeResponse(CharBuffer encodedBuffer) throws IOException {
608-
InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
596+
final InputStream encodedIn = Base64.getDecoder().wrap(new InputStream() {
609597
@Override
610598
public int read() {
611599
if (encodedBuffer.hasRemaining()) {
@@ -615,12 +603,9 @@ public int read() {
615603
}
616604
}
617605
});
618-
final Version version = Version.readVersion(new InputStreamStreamInput(encodedIn));
619-
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
620-
if (version.onOrAfter(Version.V_8_0_0)) {
621-
encodedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(encodedIn);
622-
}
623606
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(encodedIn), registry)) {
607+
final Version version = Version.readVersion(in);
608+
assert version.onOrBefore(Version.CURRENT) : version + " >= " + Version.CURRENT;
624609
in.setVersion(version);
625610
return reader.read(in);
626611
}

x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AsyncEqlSearchActionIT.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
import org.elasticsearch.action.index.IndexRequestBuilder;
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1717
import org.elasticsearch.common.Strings;
18-
import org.elasticsearch.common.compress.CompressorFactory;
1918
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
20-
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2119
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2220
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2321
import org.elasticsearch.common.io.stream.StreamInput;
@@ -41,7 +39,6 @@
4139
import org.hamcrest.Description;
4240
import org.junit.After;
4341

44-
import java.io.InputStream;
4542
import java.nio.ByteBuffer;
4643
import java.util.ArrayList;
4744
import java.util.Base64;
@@ -289,10 +286,8 @@ public StoredAsyncResponse<EqlSearchResponse> getStoredRecord(String id) throws
289286
if (doc.isExists()) {
290287
String value = doc.getSource().get("result").toString();
291288
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
292-
final Version version = Version.readVersion(buf);
293-
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
294-
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
295-
in.setVersion(version);
289+
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
290+
in.setVersion(Version.readVersion(in));
296291
return new StoredAsyncResponse<>(EqlSearchResponse::new, in);
297292
}
298293
}

x-pack/plugin/sql/src/internalClusterTest/java/org/elasticsearch/xpack/sql/action/AsyncSqlSearchActionIT.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
import org.elasticsearch.action.get.GetResponse;
1515
import org.elasticsearch.action.index.IndexRequestBuilder;
1616
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17-
import org.elasticsearch.common.compress.CompressorFactory;
1817
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
19-
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2018
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
2119
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2220
import org.elasticsearch.common.io.stream.StreamInput;
@@ -39,7 +37,6 @@
3937
import org.elasticsearch.xpack.sql.proto.Protocol;
4038
import org.junit.After;
4139

42-
import java.io.InputStream;
4340
import java.nio.ByteBuffer;
4441
import java.util.ArrayList;
4542
import java.util.Base64;
@@ -288,10 +285,8 @@ public StoredAsyncResponse<SqlQueryResponse> getStoredRecord(String id) throws E
288285
if (doc.isExists()) {
289286
String value = doc.getSource().get("result").toString();
290287
try (ByteBufferStreamInput buf = new ByteBufferStreamInput(ByteBuffer.wrap(Base64.getDecoder().decode(value)))) {
291-
final Version version = Version.readVersion(buf);
292-
final InputStream compressedIn = CompressorFactory.COMPRESSOR.threadLocalInputStream(buf);
293-
try (StreamInput in = new NamedWriteableAwareStreamInput(new InputStreamStreamInput(compressedIn), registry)) {
294-
in.setVersion(version);
288+
try (StreamInput in = new NamedWriteableAwareStreamInput(buf, registry)) {
289+
in.setVersion(Version.readVersion(in));
295290
return new StoredAsyncResponse<>(SqlQueryResponse::new, in);
296291
}
297292
}

0 commit comments

Comments
 (0)