Skip to content

Move transport decoding and aggregation to server #48263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 73 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
a0878e6
WIP
Tim-Brooks Aug 7, 2019
c4abd27
WIP tests
Tim-Brooks Aug 8, 2019
68da522
WIP
Tim-Brooks Aug 8, 2019
6b68651
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 8, 2019
c3e9874
WIP
Tim-Brooks Aug 9, 2019
9ac20c8
WIP
Tim-Brooks Aug 10, 2019
3a8643c
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 14, 2019
0066edf
Changes
Tim-Brooks Aug 16, 2019
965eca3
WIP
Tim-Brooks Aug 16, 2019
68720c4
Changes
Tim-Brooks Aug 16, 2019
ada1292
Work on aggregation
Tim-Brooks Aug 17, 2019
c8e0d35
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 20, 2019
fb3be69
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 26, 2019
1966ca9
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 28, 2019
9c1095e
WIP
Tim-Brooks Aug 28, 2019
e3f1f34
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Aug 29, 2019
19f09ca
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Sep 5, 2019
dc3de32
WIP
Tim-Brooks Sep 5, 2019
6250303
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Oct 2, 2019
efb7886
Work on paring down work
Tim-Brooks Oct 3, 2019
44bf7fa
WIP
Tim-Brooks Oct 4, 2019
67f635c
Implement nio test
Tim-Brooks Oct 4, 2019
71bf38b
Work on netty integration
Tim-Brooks Oct 4, 2019
b936c9e
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Oct 18, 2019
a5b649e
Tests
Tim-Brooks Oct 18, 2019
b476490
Fix
Tim-Brooks Oct 18, 2019
90d3769
Fixes
Tim-Brooks Oct 19, 2019
380dbd7
Fix issues
Tim-Brooks Oct 19, 2019
bb7c8db
Cleanup
Tim-Brooks Oct 19, 2019
4b5349a
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Oct 25, 2019
6d737ba
Refcounting
Tim-Brooks Oct 25, 2019
74cea7c
Tests
Tim-Brooks Oct 26, 2019
e28336d
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Nov 27, 2019
3f5f9b0
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Dec 5, 2019
6cba066
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Dec 12, 2019
27671ad
Changes
Tim-Brooks Dec 12, 2019
d70a290
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Dec 20, 2019
dc46e10
Work on decoding
Tim-Brooks Dec 21, 2019
6df1b60
Changes
Tim-Brooks Dec 21, 2019
20994c7
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Jan 9, 2020
d64d449
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Jan 15, 2020
8db094c
Changes
Tim-Brooks Jan 15, 2020
ab90f84
move back
Tim-Brooks Jan 15, 2020
fde092d
Fix issue
Tim-Brooks Jan 15, 2020
aede3ba
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 16, 2020
f41a5c3
Set version
Tim-Brooks Mar 16, 2020
5aaaf20
Changes
Tim-Brooks Mar 16, 2020
5b722c2
Fixes
Tim-Brooks Mar 16, 2020
4fc06a4
Fix
Tim-Brooks Mar 17, 2020
1e0b9e2
Rework decoder
Tim-Brooks Mar 17, 2020
fc231d2
WIP
Tim-Brooks Mar 17, 2020
85e1915
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 17, 2020
04eaff2
Changes
Tim-Brooks Mar 19, 2020
9610065
Changes
Tim-Brooks Mar 19, 2020
72ec229
Changes
Tim-Brooks Mar 19, 2020
82479ba
Add error handler
Tim-Brooks Mar 19, 2020
a313fed
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 19, 2020
2a70c3d
Change
Tim-Brooks Mar 20, 2020
f607a12
Changes
Tim-Brooks Mar 20, 2020
5a60df4
Changes
Tim-Brooks Mar 20, 2020
4971d05
Changes
Tim-Brooks Mar 20, 2020
871e40f
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 23, 2020
e761215
Change
Tim-Brooks Mar 23, 2020
e25e25f
Changes
Tim-Brooks Mar 24, 2020
ad20c99
Changes
Tim-Brooks Mar 24, 2020
98f3211
Fix tests
Tim-Brooks Mar 24, 2020
ab9cb05
Review changes
Tim-Brooks Mar 25, 2020
b24b3b3
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 25, 2020
6f5cbbf
Review changes
Tim-Brooks Mar 26, 2020
ddf36bb
Review changes
Tim-Brooks Mar 26, 2020
4aa607d
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 26, 2020
aacc96c
Change
Tim-Brooks Mar 27, 2020
9dff5cc
Merge remote-tracking branch 'upstream/master' into incremental_inbou…
Tim-Brooks Mar 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.Transports;

import java.nio.channels.ClosedChannelException;
Expand All @@ -45,23 +49,23 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();

private WriteOperation currentWrite;
private final InboundPipeline pipeline;

Netty4MessageChannelHandler(Netty4Transport transport) {
Netty4MessageChannelHandler(PageCacheRecycler recycler, Netty4Transport transport) {
this.transport = transport;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert Transports.assertTransportThread();
assert msg instanceof ByteBuf : "Expected message type ByteBuf, found: " + msg.getClass();

final ByteBuf buffer = (ByteBuf) msg;
try {
Channel channel = ctx.channel();
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.inboundMessage(channelAttribute.get(), Netty4Utils.toBytesReference(buffer));
} finally {
buffer.release();
Netty4TcpChannel channel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
final BytesReference wrapped = Netty4Utils.toBytesReference(buffer);
try (ReleasableBytesReference reference = new ReleasableBytesReference(wrapped, buffer::release)) {
pipeline.handleBytes(channel, reference);
}
}

Expand Down Expand Up @@ -104,6 +108,7 @@ public void flush(ChannelHandlerContext ctx) {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
doFlush(ctx);
Releasables.closeWhileHandlingException(pipeline);
super.channelInactive(ctx);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,8 @@ protected class ClientChannelInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
}

@Override
Expand All @@ -349,8 +348,7 @@ protected void initChannel(Channel ch) throws Exception {
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, true, name, ch.newSucceededFuture());
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(pageCacheRecycler, Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected Function<DiscoveryNode, TcpChannelFactory> clientChannelFactoryFunctio
return (n) -> new TcpChannelFactoryImpl(profileSettings, true);
}

protected abstract class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {
protected abstract static class TcpChannelFactory extends ChannelFactory<NioTcpServerChannel, NioTcpChannel> {

protected TcpChannelFactory(ProfileSettings profileSettings) {
super(profileSettings.tcpNoDelay, profileSettings.tcpKeepAlive, profileSettings.tcpKeepIdle, profileSettings.tcpKeepInterval,
Expand All @@ -162,8 +162,8 @@ private TcpChannelFactoryImpl(ProfileSettings profileSettings, boolean isClient)
@Override
public NioTcpChannel createChannel(NioSelector selector, SocketChannel channel, Config.Socket socketConfig) {
NioTcpChannel nioChannel = new NioTcpChannel(isClient == false, profileName, channel);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, NioTransport.this);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
TcpReadWriteHandler handler = new TcpReadWriteHandler(nioChannel, pageCacheRecycler, NioTransport.this);
BytesChannelContext context = new BytesChannelContext(nioChannel, selector, socketConfig, exceptionHandler, handler,
new InboundChannelBuffer(pageAllocator));
nioChannel.setContext(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,47 @@
package org.elasticsearch.transport.nio;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.nio.BytesWriteHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.Page;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;

public class TcpReadWriteHandler extends BytesWriteHandler {

private final NioTcpChannel channel;
private final TcpTransport transport;
private final InboundPipeline pipeline;

public TcpReadWriteHandler(NioTcpChannel channel, TcpTransport transport) {
public TcpReadWriteHandler(NioTcpChannel channel, PageCacheRecycler recycler, TcpTransport transport) {
this.channel = channel;
this.transport = transport;
this.pipeline = new InboundPipeline(transport.getVersion(), recycler, transport::inboundMessage, transport::inboundDecodeException);
}

@Override
public int consumeReads(InboundChannelBuffer channelBuffer) throws IOException {
BytesReference bytesReference = BytesReference.fromByteBuffers(channelBuffer.sliceBuffersTo(channelBuffer.getIndex()));
return transport.consumeNetworkReads(channel, bytesReference);
Page[] pages = channelBuffer.sliceAndRetainPagesTo(channelBuffer.getIndex());
BytesReference[] references = new BytesReference[pages.length];
for (int i = 0; i < pages.length; ++i) {
references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
}
Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
try (ReleasableBytesReference reference = new ReleasableBytesReference(new CompositeBytesReference(references), releasable)) {
pipeline.handleBytes(channel, reference);
return reference.length();
}
}

@Override
public void close() {
Releasables.closeWhileHandlingException(pipeline);
super.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
if (bufferCount == 0) {
return BytesArray.EMPTY;
} else if (bufferCount == 1) {
return new ByteBufferReference(buffers[0]);
return fromByteBuffer(buffers[0]);
} else {
ByteBufferReference[] references = new ByteBufferReference[bufferCount];
for (int i = 0; i < bufferCount; ++i) {
Expand All @@ -102,6 +102,13 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
}
}

/**
* Returns BytesReference composed of the provided ByteBuffer.
*/
static BytesReference fromByteBuffer(ByteBuffer buffer) {
return new ByteBufferReference(buffer);
}

/**
* Returns the byte at the specified index. Need to be between 0 and length.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -33,21 +34,43 @@
* An extension to {@link BytesReference} that requires releasing its content. This
* class exists to make it explicit when a bytes reference needs to be released, and when not.
*/
public final class ReleasableBytesReference implements Releasable, BytesReference {
public final class ReleasableBytesReference extends AbstractRefCounted implements Releasable, BytesReference {

public static final Releasable NO_OP = () -> {};
private final BytesReference delegate;
private final Releasable releasable;

public ReleasableBytesReference(BytesReference delegate, Releasable releasable) {
super("bytes-reference");
this.delegate = delegate;
this.releasable = releasable;
}

public static ReleasableBytesReference wrap(BytesReference reference) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to only be used in tests and should probably live in test code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by production code now. And I imagine it's use will grow as more things need to interface with the Releasable version (#50107 type of work).

return new ReleasableBytesReference(reference, NO_OP);
}

@Override
public void close() {
protected void closeInternal() {
Releasables.close(releasable);
}

public ReleasableBytesReference retain() {
incRef();
return this;
}

public ReleasableBytesReference retainedSlice(int from, int length) {
BytesReference slice = delegate.slice(from, length);
incRef();
return new ReleasableBytesReference(slice, this);
}

@Override
public void close() {
decRef();
}

@Override
public byte get(int index) {
return delegate.get(index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface Compressor {

boolean isCompressed(BytesReference bytes);

int headerLength();

StreamInput streamInput(StreamInput in) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public boolean isCompressed(BytesReference bytes) {
return true;
}

@Override
public int headerLength() {
return HEADER.length;
}

@Override
public StreamInput streamInput(StreamInput in) throws IOException {
final byte[] headerBytes = new byte[HEADER.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static void close(Releasable... releasables) {
}

/** Release the provided {@link Releasable}s, ignoring exceptions. */
public static void closeWhileHandlingException(Iterable<Releasable> releasables) {
public static void closeWhileHandlingException(Iterable<? extends Releasable> releasables) {
close(releasables, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,12 @@ public void writeTo(StreamOutput out) throws IOException {
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
final Tuple<Map<String, String>, Map<String, Set<String>>> streamTuple = readHeadersFromStream(in);
final Map<String, String> requestHeaders = streamTuple.v1();
final Map<String, Set<String>> responseHeaders = streamTuple.v2();
setHeaders(readHeadersFromStream(in));
}

public void setHeaders(Tuple<Map<String, String>, Map<String, Set<String>>> headerTuple) {
final Map<String, String> requestHeaders = headerTuple.v1();
final Map<String, Set<String>> responseHeaders = headerTuple.v2();
final ThreadContextStruct struct;
if (requestHeaders.isEmpty() && responseHeaders.isEmpty()) {
struct = ThreadContextStruct.EMPTY;
Expand Down
Loading