Skip to content

Introduce Netty 4 #19526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
46cef63
Introduce Netty 4
jasontedor Jul 20, 2016
dd88f56
Remove use of deprecated field in Netty4CorsConfig
jasontedor Jul 21, 2016
cee2a90
Cleaner exception unwrapping on transport layer
jasontedor Jul 21, 2016
2aa6763
Update Netty 4 SHAs to latest snapshots
jasontedor Jul 21, 2016
621161b
Cleanup TransportServiceAdapter#received/sent
jasontedor Jul 21, 2016
e57f1cd
Netty 4 resource management
jasontedor Jul 21, 2016
fc320bc
Simplify close condition in Netty 4 HTTP channel
jasontedor Jul 22, 2016
184083c
Move initializer for Netty 4 HTTP request params
jasontedor Jul 22, 2016
77254b2
Simplify REST requests
jasontedor Jul 22, 2016
ed8355d
Default transport client to Netty 3
jasontedor Jul 22, 2016
816018c
Clarifying exception swallowing in TCP decode
jasontedor Jul 22, 2016
c4c9c87
Fix BytesRestResponseTests test case
jasontedor Jul 22, 2016
2b2caea
Fix resource leak in Netty 4
jasontedor Jul 22, 2016
93317e5
Update Netty 4 SHAs to latest snapshots
jasontedor Jul 22, 2016
18d53d3
Fix NPE in Reindex retry tests
jasontedor Jul 22, 2016
b458cb1
Run packaged REST tests with Netty 4 module
jasontedor Jul 22, 2016
702bbe4
Revert size header frame decoding changes
jasontedor Jul 22, 2016
35b6c64
Use Elastic Netty snapshot artifacts
jasontedor Jul 22, 2016
df19f09
Rename lambda future variable
jasontedor Jul 22, 2016
2573dca
Clarify bytes reference to composite buffer
jasontedor Jul 22, 2016
463a4f7
Update Netty 4 SHAs to latest snapshots
jasontedor Jul 23, 2016
9960643
Set netty.assert.buglevel in the client
jasontedor Jul 23, 2016
9a4b46f
Revert renaming of client:transport
jasontedor Jul 23, 2016
0da5ebe
Preserve headers from the thread context
jasontedor Jul 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ subprojects {
"org.elasticsearch.test:logger-usage:${version}": ':test:logger-usage',
// for transport client
"org.elasticsearch.plugin:transport-netty3-client:${version}": ':modules:transport-netty3',
"org.elasticsearch.plugin:transport-netty4-client:${version}": ':modules:transport-netty4',
"org.elasticsearch.plugin:reindex-client:${version}": ':modules:reindex',
"org.elasticsearch.plugin:lang-mustache-client:${version}": ':modules:lang-mustache',
"org.elasticsearch.plugin:percolator-client:${version}": ':modules:percolator',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ class BuildPlugin implements Plugin<Project> {
url "http://s3.amazonaws.com/download.elasticsearch.org/lucenesnapshots/${revision}"
}
}
repos.maven {
name 'netty-snapshots'
url "http://s3.amazonaws.com/download.elasticsearch.org/nettysnapshots/20160722"
}
}

/** Returns a closure which can be used with a MavenPom for removing transitive dependencies. */
Expand Down
1 change: 1 addition & 0 deletions client/transport/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ group = 'org.elasticsearch.client'
dependencies {
compile "org.elasticsearch:elasticsearch:${version}"
compile project(path: ':modules:transport-netty3', configuration: 'runtime')
compile project(path: ':modules:transport-netty4', configuration: 'runtime')
compile project(path: ':modules:reindex', configuration: 'runtime')
compile project(path: ':modules:lang-mustache', configuration: 'runtime')
compile project(path: ':modules:percolator', configuration: 'runtime')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,47 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport.client;

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.percolator.PercolatorPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.mustache.MustachePlugin;
import org.elasticsearch.transport.Netty3Plugin;
import org.elasticsearch.transport.Netty4Plugin;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import java.util.List;

/**
* A builder to create an instance of {@link TransportClient}
* This class pre-installs the {@link Netty3Plugin}, {@link ReindexPlugin}, {@link PercolatorPlugin}, and {@link MustachePlugin}
* This class pre-installs the
* {@link Netty3Plugin},
* {@link Netty4Plugin},
* {@link ReindexPlugin},
* {@link PercolatorPlugin},
* and {@link MustachePlugin}
* for the client. These plugins are all elasticsearch core modules required.
*/
@SuppressWarnings({"unchecked","varargs"})
public class PreBuiltTransportClient extends TransportClient {
private static final Collection<Class<? extends Plugin>> PRE_INSTALLED_PLUGINS = Collections.unmodifiableList(Arrays.asList(
TransportPlugin.class, ReindexPlugin.class, PercolatorPlugin.class, MustachePlugin.class));

private static final Collection<Class<? extends Plugin>> PRE_INSTALLED_PLUGINS =
Collections.unmodifiableList(
Arrays.asList(
Netty3Plugin.class,
Netty4Plugin.class,
TransportPlugin.class,
ReindexPlugin.class,
PercolatorPlugin.class,
MustachePlugin.class));

@SafeVarargs
public PreBuiltTransportClient(Settings settings, Class<? extends Plugin>... plugins) {
Expand All @@ -50,14 +67,25 @@ public PreBuiltTransportClient(Settings settings, Collection<Class<? extends Plu
super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS));
}

/**
* The default transport implementation for the transport client.
*/
public static final class TransportPlugin extends Netty3Plugin {
// disable assertions for permissions since we might not have the permissions here
// compared to if we are loaded as a real module to the es server
public TransportPlugin(Settings settings) {
super(Settings.builder().put("netty.assert.buglevel", false).put(settings).build());
public static final class TransportPlugin extends Plugin {

private static final Setting<Boolean> ASSERT_NETTY_BUGLEVEL =
Setting.boolSetting("netty.assert.buglevel", true, Setting.Property.NodeScope);

@Override
public List<Setting<?>> getSettings() {
return Collections.singletonList(ASSERT_NETTY_BUGLEVEL);
}

@Override
public Settings additionalSettings() {
return Settings.builder()
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty3Plugin.NETTY_TRANSPORT_NAME)
.put(NetworkModule.HTTP_TYPE_KEY, Netty3Plugin.NETTY_HTTP_TRANSPORT_NAME)
.put("netty.assert.buglevel", true)
.build();
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport.client;

import com.carrotsearch.randomizedtesting.RandomizedTest;
Expand Down Expand Up @@ -57,4 +58,5 @@ public void testInstallPluginTwice() {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

Expand All @@ -36,24 +35,24 @@ public class PagedBytesReference extends BytesReference {
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;

private final BigArrays bigarrays;
protected final ByteArray bytearray;
protected final ByteArray byteArray;
private final int offset;
private final int length;

public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
this(bigarrays, bytearray, 0, length);
public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
this(bigarrays, byteArray, 0, length);
}

public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) {
public PagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int from, int length) {
this.bigarrays = bigarrays;
this.bytearray = bytearray;
this.byteArray = byteArray;
this.offset = from;
this.length = length;
}

@Override
public byte get(int index) {
return bytearray.get(offset + index);
return byteArray.get(offset + index);
}

@Override
Expand All @@ -66,14 +65,14 @@ public BytesReference slice(int from, int length) {
if (from < 0 || (from + length) > length()) {
throw new IllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]");
}
return new PagedBytesReference(bigarrays, bytearray, offset + from, length);
return new PagedBytesReference(bigarrays, byteArray, offset + from, length);
}

@Override
public BytesRef toBytesRef() {
BytesRef bref = new BytesRef();
// if length <= pagesize this will dereference the page, or materialize the byte[]
bytearray.get(offset, length, bref);
byteArray.get(offset, length, bref);
return bref;
}

Expand All @@ -95,7 +94,7 @@ public final BytesRefIterator iterator() {
@Override
public BytesRef next() throws IOException {
if (nextFragmentSize != 0) {
final boolean materialized = bytearray.get(offset + position, nextFragmentSize, slice);
final boolean materialized = byteArray.get(offset + position, nextFragmentSize, slice);
assert materialized == false : "iteration should be page aligned but array got materialized";
position += nextFragmentSize;
final int remaining = length - position;
Expand All @@ -111,6 +110,6 @@ public BytesRef next() throws IOException {

@Override
public long ramBytesUsed() {
return bytearray.ramBytesUsed();
return byteArray.ramBytesUsed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@
*/
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
super(bigarrays, bytearray, length);
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
super(bigarrays, byteArray, length);
}

@Override
public void close() {
Releasables.close(bytearray);
Releasables.close(byteArray);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ public interface ReleasableBytesStream extends BytesStream {

@Override
ReleasablePagedBytesReference bytes();

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public class BytesStreamOutput extends StreamOutput implements BytesStream {

protected final BigArrays bigarrays;
protected final BigArrays bigArrays;

protected ByteArray bytes;
protected int count;
Expand All @@ -57,9 +57,9 @@ public BytesStreamOutput(int expectedSize) {
this(expectedSize, BigArrays.NON_RECYCLING_INSTANCE);
}

protected BytesStreamOutput(int expectedSize, BigArrays bigarrays) {
this.bigarrays = bigarrays;
this.bytes = bigarrays.newByteArray(expectedSize);
protected BytesStreamOutput(int expectedSize, BigArrays bigArrays) {
this.bigArrays = bigArrays;
this.bytes = bigArrays.newByteArray(expectedSize);
}

@Override
Expand Down Expand Up @@ -100,7 +100,7 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException {
public void reset() {
// shrink list of pages
if (bytes.size() > BigArrays.PAGE_SIZE_IN_BYTES) {
bytes = bigarrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES);
bytes = bigArrays.resize(bytes, BigArrays.PAGE_SIZE_IN_BYTES);
}

// go back to start
Expand Down Expand Up @@ -145,7 +145,7 @@ public int size() {

@Override
public BytesReference bytes() {
return new PagedBytesReference(bigarrays, bytes, count);
return new PagedBytesReference(bigArrays, bytes, count);
}

/**
Expand All @@ -157,7 +157,7 @@ public long ramBytesUsed() {
}

private void ensureCapacity(int offset) {
bytes = bigarrays.grow(bytes, offset);
bytes = bigArrays.grow(bytes, offset);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public ReleasableBytesStreamOutput(BigArrays bigarrays) {
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
}

public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigarrays) {
super(expectedSize, bigarrays);
public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays);
}

@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigarrays, bytes, count);
return new ReleasablePagedBytesReference(bigArrays, bytes, count);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ public interface Releasable extends Closeable {

@Override
void close();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
public interface HttpServerAdapter {

void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.transport.BoundTransportAddress;

/**
*
*/
public interface HttpServerTransport extends LifecycleComponent {

BoundTransportAddress boundAddress();
Expand All @@ -34,4 +31,5 @@ public interface HttpServerTransport extends LifecycleComponent {
HttpStats stats();

void httpServerAdapter(HttpServerAdapter httpServerAdapter);

}
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,5 @@ public RestRequest request() {
public boolean detailedErrorsEnabled() {
return detailedErrorsEnabled;
}

}
2 changes: 2 additions & 0 deletions core/src/main/java/org/elasticsearch/rest/RestChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* A channel used to construct bytes / builder based outputs, and send responses.
*/
public interface RestChannel {

XContentBuilder newBuilder() throws IOException;

XContentBuilder newErrorBuilder() throws IOException;
Expand All @@ -46,4 +47,5 @@ public interface RestChannel {
boolean detailedErrorsEnabled();

void sendResponse(RestResponse response);

}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void dispatchRequest(final RestRequest request, final RestChannel channel
if (!checkRequestParameters(request, channel)) {
return;
}
try (ThreadContext.StoredContext t = threadContext.stashContext()) {
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
for (String key : headersToCopy) {
String httpHeader = request.header(key);
if (httpHeader != null) {
Expand Down
Loading