Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunked REST serialization for large REST responses #88311

Merged
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
63dc225
getting there
original-brownbear Jun 20, 2022
d542b74
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 20, 2022
b323fd9
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 20, 2022
ba7cdea
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 20, 2022
fa63c8c
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 21, 2022
f9a3215
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 21, 2022
1ad294d
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 21, 2022
f022190
checkpoint
original-brownbear Jun 21, 2022
a87cfb3
cp
original-brownbear Jun 21, 2022
75b036d
cp
original-brownbear Jun 21, 2022
299cdba
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 22, 2022
333da0d
fix compilation
original-brownbear Jun 22, 2022
d9b1f8a
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 22, 2022
84dd503
works better
original-brownbear Jun 22, 2022
6077797
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 22, 2022
a95590e
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 24, 2022
07bfaab
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 27, 2022
21cdd21
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 28, 2022
00ee6a0
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 28, 2022
6c3b963
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jun 29, 2022
9f86863
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 4, 2022
5b9219a
bck
original-brownbear Jul 4, 2022
582dfe8
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 4, 2022
5f50d87
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 5, 2022
8a28b04
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 5, 2022
d327f2d
cleanup test
original-brownbear Jul 5, 2022
649481e
simpler API
original-brownbear Jul 5, 2022
144c227
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 5, 2022
c061b73
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 6, 2022
a087a5b
Use chunked REST serialization for large REST responses
original-brownbear Jul 6, 2022
7f85c19
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 6, 2022
53c5047
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 7, 2022
7358a38
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 7, 2022
2e9e5d5
more fixes + testing
original-brownbear Jul 7, 2022
c313eb6
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 7, 2022
9b7281b
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 8, 2022
d67bb7f
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 12, 2022
6756bd5
Merge remote-tracking branch 'elastic/master' into chunked-rest-response
original-brownbear Jul 12, 2022
d33bc0b
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 25, 2022
e7d7893
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 25, 2022
eb7bd9b
test the actual thing
original-brownbear Jul 25, 2022
c75f73b
Merge remote-tracking branch 'origin/chunked-rest-response' into chun…
original-brownbear Jul 25, 2022
1501636
progress
original-brownbear Jul 25, 2022
fff84b6
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 25, 2022
13b015d
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 25, 2022
d28a31b
test + fix error handling for chunked
original-brownbear Jul 25, 2022
2214a31
api
original-brownbear Jul 25, 2022
b4091d7
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 25, 2022
00f97e5
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 26, 2022
fbb56fd
moar api
original-brownbear Jul 26, 2022
260b474
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 28, 2022
fa93925
noicer APIs
original-brownbear Jul 28, 2022
62bd60f
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 28, 2022
007aa02
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 28, 2022
fc3897a
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 29, 2022
0368b58
docs
original-brownbear Jul 29, 2022
b319e1b
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 29, 2022
aee391f
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 30, 2022
19724aa
start test
original-brownbear Jul 30, 2022
19306e2
test chunked serialization]
original-brownbear Jul 31, 2022
99cea44
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Jul 31, 2022
5479a8e
drier + doc
original-brownbear Jul 31, 2022
605a867
cleanup and docs
original-brownbear Jul 31, 2022
f13702b
moar cleanup
original-brownbear Jul 31, 2022
9d0379c
less noisy
original-brownbear Jul 31, 2022
79c0fe2
fix comment
original-brownbear Jul 31, 2022
c29de50
nicer
original-brownbear Jul 31, 2022
2ee3615
Update docs/changelog/88311.yaml
original-brownbear Aug 1, 2022
3ec4782
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 3, 2022
82086c0
change API
original-brownbear Aug 3, 2022
34a1657
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 3, 2022
a7e2e5d
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 9, 2022
35ad972
use iterator
original-brownbear Aug 9, 2022
e3073b5
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 18, 2022
a3d7b97
CR: comments
original-brownbear Aug 18, 2022
e3a979e
move assertion
original-brownbear Aug 18, 2022
9b06800
fix bug and add test
original-brownbear Aug 18, 2022
ee09bb9
don't assume write exec ordering
original-brownbear Aug 18, 2022
f422312
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 25, 2022
84940fc
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 25, 2022
16c9856
cr: comments
original-brownbear Aug 25, 2022
1532b41
rename'
original-brownbear Aug 25, 2022
e6ddf04
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 26, 2022
857be16
remove pointless extra flush
original-brownbear Aug 26, 2022
1a4b364
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 26, 2022
1a76fed
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Aug 27, 2022
7e6cd61
handle HEAD request correctly
original-brownbear Aug 27, 2022
71dc538
reword
original-brownbear Aug 27, 2022
f874c72
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Sep 1, 2022
cc312d7
add asssertions
original-brownbear Sep 1, 2022
1154131
CR: test adjustments
original-brownbear Sep 1, 2022
6516482
CR: test adjustments
original-brownbear Sep 1, 2022
dcf73e4
validate response body fully
original-brownbear Sep 1, 2022
911e91d
randomize write order
original-brownbear Sep 1, 2022
98ff648
test the other way around
original-brownbear Sep 1, 2022
2943f1b
add ChunkedRestResponseBodyTests
original-brownbear Sep 1, 2022
ca30f8c
Merge remote-tracking branch 'elastic/main' into chunked-rest-response
original-brownbear Sep 6, 2022
9743942
CR: comments
original-brownbear Sep 6, 2022
99a5aa1
single point for content type
original-brownbear Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestStatus;

public class Netty4ChunkedHttpResponse extends DefaultHttpResponse implements Netty4RestResponse {

private final int sequence;

private final ChunkedRestResponseBody body;

Netty4ChunkedHttpResponse(int sequence, HttpVersion version, RestStatus status, ChunkedRestResponseBody body) {
super(version, HttpResponseStatus.valueOf(status.getStatus()));
this.sequence = sequence;
this.body = body;
}

public ChunkedRestResponseBody body() {
return body;
}

@Override
public int getSequence() {
return sequence;
}

@Override
public void addHeader(String name, String value) {
headers().add(name, value);
}

@Override
public boolean containsHeader(String name) {
return headers().contains(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.compression.JdkZlibEncoder;
Expand All @@ -26,11 +27,17 @@

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.Netty4WriteThrottlingHandler;
import org.elasticsearch.transport.netty4.NettyAllocator;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
Expand All @@ -41,13 +48,22 @@

/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their corresponding requests.
* This handler also throttles write operations and will not pass any writes to the next handler so long as the channel is not writable.
*/
public class Netty4HttpPipeliningHandler extends ChannelDuplexHandler {

private final Logger logger;

private final int maxEventsHeld;
private final PriorityQueue<Tuple<Netty4HttpResponse, ChannelPromise>> outboundHoldingQueue;
private final PriorityQueue<Tuple<? extends Netty4RestResponse, ChannelPromise>> outboundHoldingQueue;

private record ChunkedWrite(PromiseCombiner combiner, ChannelPromise onDone, Netty4ChunkedHttpResponse response) {}

/**
* The current {@link ChunkedWrite} if a chunked write is executed at the moment.
*/
@Nullable
private ChunkedWrite currentChunkedWrite;

/*
* The current read and write sequence numbers. Read sequence numbers are attached to requests in the order they are read from the
Expand Down Expand Up @@ -119,36 +135,32 @@ protected void handlePipelinedRequest(ChannelHandlerContext ctx, Netty4HttpReque
}

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) {
assert msg instanceof Netty4HttpResponse : "Invalid message type: " + msg.getClass();
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws IOException {
assert msg instanceof Netty4RestResponse : "Invalid message type: " + msg.getClass();
boolean success = false;
try {
final Netty4HttpResponse response = (Netty4HttpResponse) msg;
if (response.getSequence() != writeSequence) {
assert response.getSequence() > writeSequence
: "response sequence [" + response.getSequence() + "] we below write sequence [" + writeSequence + "]";
final Netty4RestResponse restResponse = (Netty4RestResponse) msg;
if (restResponse.getSequence() != writeSequence) {
assert restResponse.getSequence() > writeSequence
: "response sequence [" + restResponse.getSequence() + "] we below write sequence [" + writeSequence + "]";
if (outboundHoldingQueue.size() >= maxEventsHeld) {
int eventCount = outboundHoldingQueue.size() + 1;
throw new IllegalStateException(
"Too many pipelined events [" + eventCount + "]. Max events allowed [" + maxEventsHeld + "]."
);
}
// response is not at the current sequence number so we add it to the outbound queue and return
outboundHoldingQueue.add(new Tuple<>(response, promise));
outboundHoldingQueue.add(new Tuple<>(restResponse, promise));
success = true;
return;
}

// response is at the current sequence number and does not need to wait for any other response to be written so we write
// it out directly
doWrite(ctx, response, promise);
doWrite(ctx, restResponse, promise);
success = true;
// see if we have any queued up responses that became writeable due to the above write
while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) {
final Tuple<Netty4HttpResponse, ChannelPromise> top = outboundHoldingQueue.poll();
assert top != null : "we know the outbound holding queue to not be empty at this point";
doWrite(ctx, top.v1(), top.v2());
}
drainQueuedNowWritable(ctx);
} catch (IllegalStateException e) {
ctx.channel().close();
} finally {
Expand All @@ -158,6 +170,14 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
}
}

private void drainQueuedNowWritable(ChannelHandlerContext ctx) throws IOException {
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
while (outboundHoldingQueue.isEmpty() == false && outboundHoldingQueue.peek().v1().getSequence() == writeSequence) {
final Tuple<? extends Netty4RestResponse, ChannelPromise> top = outboundHoldingQueue.poll();
assert top != null : "we know the outbound holding queue to not be empty at this point";
doWrite(ctx, top.v1(), top.v2());
}
}

private static final String DO_NOT_SPLIT = "es.unsafe.do_not_split_http_responses";

private static final boolean DO_NOT_SPLIT_HTTP_RESPONSES;
Expand All @@ -169,6 +189,14 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
SPLIT_THRESHOLD = (int) (NettyAllocator.suggestedMaxAllocationSize() * 0.99);
}

private void doWrite(ChannelHandlerContext ctx, Netty4RestResponse readyResponse, ChannelPromise promise) throws IOException {
if (readyResponse instanceof Netty4HttpResponse) {
doWrite(ctx, (Netty4HttpResponse) readyResponse, promise);
} else {
doWrite(ctx, (Netty4ChunkedHttpResponse) readyResponse, promise);
}
}

/**
* Split up large responses to prevent batch compression {@link JdkZlibEncoder} down the pipeline.
*/
Expand All @@ -181,6 +209,27 @@ private void doWrite(ChannelHandlerContext ctx, Netty4HttpResponse readyResponse
writeSequence++;
}

private void doWrite(ChannelHandlerContext ctx, Netty4ChunkedHttpResponse readyResponse, ChannelPromise promise) throws IOException {
assert currentChunkedWrite == null : "unexpected existing write [" + currentChunkedWrite + "]";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert this in the outer doWrite, ensuring that no other writes occur when a chunked write is active?

final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
currentChunkedWrite = new ChunkedWrite(combiner, promise, readyResponse);
original-brownbear marked this conversation as resolved.
Show resolved Hide resolved
final ChannelPromise first = ctx.newPromise();
combiner.add((Future<Void>) first);
enqueueWrite(ctx, readyResponse, first);
while (ctx.channel().isWritable()) {
if (writeChunk(ctx, combiner, readyResponse.body())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks slightly odd to me, might be me not fully understanding if a write can happen between the channel becoming writable and the channelWritabilityChanged method being called. But if enqueueWrite ever enqueues the readyResponse, it seems wrong to start writing the chunk now? I suppose you assume that queuedWrites is empty, but I am not sure that is always the case. If a split write happens first and then a chunked write (due to pipelining), would we then not risk the chunk being written here first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤦 again, you're right this might be an issue we could run into. I pushed ee09bb9 to as you point out, cover the case where we only queue the first chunk because there's a pending split write of a huge non-chunked message.

finishChunkedWrite();
return;
}
}
}

private void finishChunkedWrite() {
currentChunkedWrite.combiner.finish(currentChunkedWrite.onDone);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think netty does catch all here, but would feel safer to clear currentChunkedWrite and increment writeSequence either before calling finish or in a finally in case any of the listeners throws.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea unless we have a bug where we finish the same thing twice we shouldn't get any throwing here, but makes sense to me still -> put a finally here.

currentChunkedWrite = null;
writeSequence++;
}

private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
HttpResponse response = new DefaultHttpResponse(msg.protocolVersion(), msg.status(), msg.headers());
Expand All @@ -193,15 +242,15 @@ private void splitAndWrite(ChannelHandlerContext ctx, Netty4HttpResponse msg, Ch
combiner.finish(promise);
}

public void channelWritabilityChanged(ChannelHandlerContext ctx) {
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
if (ctx.channel().isWritable()) {
doFlush(ctx);
}
ctx.fireChannelWritabilityChanged();
}

@Override
public void flush(ChannelHandlerContext ctx) {
public void flush(ChannelHandlerContext ctx) throws IOException {
if (doFlush(ctx) == false) {
ctx.flush();
}
Expand All @@ -218,7 +267,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
*
* @return true if a call to this method resulted in a call to {@link ChannelHandlerContext#flush()} on the given {@code ctx}
*/
private boolean doFlush(ChannelHandlerContext ctx) {
private boolean doFlush(ChannelHandlerContext ctx) throws IOException {
assert ctx.executor().inEventLoop();
final Channel channel = ctx.channel();
if (channel.isActive() == false) {
Expand All @@ -227,8 +276,28 @@ private boolean doFlush(ChannelHandlerContext ctx) {
}
boolean needsFlush = true;
while (channel.isWritable()) {
final WriteOperation currentWrite = queuedWrites.poll();
WriteOperation currentWrite = queuedWrites.poll();
if (currentWrite == null) {
drainQueuedNowWritable(ctx);
if (channel.isWritable() == false) {
// try flushing to make channel writable again, loop will only continue if channel becomes writable again
ctx.flush();
needsFlush = false;
}
if (channel.isWritable() == false) {
break;
}
currentWrite = queuedWrites.poll();
}
if (currentWrite == null) {
// TODO: this is confusing, handle the outstanding first chunk of a chunked write nicer than by putting it in the
// queued writes
if (currentChunkedWrite != null) {
if (writeChunk(ctx, currentChunkedWrite.combiner, currentChunkedWrite.response.body())) {
finishChunkedWrite();
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd think we would want to continue rather than break also if writeChunk returns false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right 🤦 that was a real bug, thanks for catching. I added a specific test with a message that outputs small chunks to reproduce and cover now (see 9b06800).

}
}
break;
}
ctx.write(currentWrite.msg, currentWrite.promise);
Expand All @@ -248,6 +317,19 @@ private boolean doFlush(ChannelHandlerContext ctx) {
return true;
}

private boolean writeChunk(ChannelHandlerContext ctx, PromiseCombiner combiner, ChunkedRestResponseBody body) throws IOException {
final ReleasableBytesReference bytes = body.encodeChunk(
Netty4WriteThrottlingHandler.MAX_BYTES_PER_WRITE,
serverTransport.recycler()
);
final ByteBuf content = Netty4Utils.toByteBuf(bytes);
final boolean done = body.isDone();
final ChannelFuture f = ctx.write(done ? new DefaultLastHttpContent(content) : new DefaultHttpContent(content));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert that:

assert done || length > 0;

to ensure progress of chunking in the wide variety of implementations we will eventually see?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can be more extreme IMO and just assert length > 0 and done == false before that. We shouldn't be calling this once we're done and should never emit an empty buffer not even for the last invocation because that'll just be needless overhead for an empty last chunk. Adding those assertions

Copy link
Contributor

@henningandersen henningandersen Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we can always have that guarantee for the last chunk, but as long as we emit json it sounds plausible so ok by me for now.

f.addListener(ignored -> bytes.close());
combiner.add(f);
return done;
}

private void failQueuedWrites() {
WriteOperation queuedWrite;
while ((queuedWrite = queuedWrites.poll()) != null) {
Expand All @@ -257,21 +339,29 @@ private void failQueuedWrites() {

@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
List<Tuple<Netty4HttpResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();
if (currentChunkedWrite != null) {
safeFailPromise(currentChunkedWrite.onDone, new ClosedChannelException());
currentChunkedWrite = null;
}
List<Tuple<? extends Netty4RestResponse, ChannelPromise>> inflightResponses = removeAllInflightResponses();

if (inflightResponses.isEmpty() == false) {
ClosedChannelException closedChannelException = new ClosedChannelException();
for (Tuple<Netty4HttpResponse, ChannelPromise> inflightResponse : inflightResponses) {
try {
inflightResponse.v2().setFailure(closedChannelException);
} catch (RuntimeException e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
for (Tuple<? extends Netty4RestResponse, ChannelPromise> inflightResponse : inflightResponses) {
safeFailPromise(inflightResponse.v2(), closedChannelException);
}
}
ctx.close(promise);
}

private void safeFailPromise(ChannelPromise promise, Exception ex) {
try {
promise.setFailure(ex);
} catch (RuntimeException e) {
logger.error("unexpected error while releasing pipelined http responses", e);
}
}

private Future<Void> enqueueWrite(ChannelHandlerContext ctx, HttpObject msg) {
final ChannelPromise p = ctx.newPromise();
enqueueWrite(ctx, msg, p);
Expand Down Expand Up @@ -299,8 +389,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

private List<Tuple<Netty4HttpResponse, ChannelPromise>> removeAllInflightResponses() {
ArrayList<Tuple<Netty4HttpResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
private List<Tuple<? extends Netty4RestResponse, ChannelPromise>> removeAllInflightResponses() {
ArrayList<Tuple<? extends Netty4RestResponse, ChannelPromise>> responses = new ArrayList<>(outboundHoldingQueue);
outboundHoldingQueue.clear();
return responses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.ChunkedRestResponseBody;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;
Expand Down Expand Up @@ -237,6 +239,11 @@ public Netty4HttpResponse createResponse(RestStatus status, BytesReference conte
return new Netty4HttpResponse(sequence, request.protocolVersion(), status, contentRef);
}

@Override
public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody content) {
return new Netty4ChunkedHttpResponse(sequence, request.protocolVersion(), status, content);
}

@Override
public Exception getInboundException() {
return inboundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
import io.netty.handler.codec.http.HttpVersion;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.http.HttpResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.netty4.Netty4Utils;

public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpResponse {
public class Netty4HttpResponse extends DefaultFullHttpResponse implements Netty4RestResponse {

private final int sequence;

Expand All @@ -26,6 +25,7 @@ public class Netty4HttpResponse extends DefaultFullHttpResponse implements HttpR
this.sequence = sequence;
}

@Override
public int getSequence() {
return sequence;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import org.elasticsearch.http.HttpResponse;

public interface Netty4RestResponse extends HttpResponse {

int getSequence();
}
Loading