Skip to content

Commit

Permalink
Migrate to Netty 4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Dec 6, 2017
1 parent 1646c0d commit 792c85c
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 198 deletions.
12 changes: 12 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@
<goals>package</goals>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<property>
<name>io.netty.leakDetection.level</name>
<value>PARANOID</value>
</property>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>
113 changes: 44 additions & 69 deletions src/main/java/org/graylog/plugins/beats/BeatsFrameDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.zip.InflaterInputStream;

Expand Down Expand Up @@ -68,26 +67,13 @@ enum DecodingState {
private long windowSize;
private long sequenceNum;


public BeatsFrameDecoder() {
super(DecodingState.PROTOCOL_VERSION, true);
super(DecodingState.PROTOCOL_VERSION);
}

@Override
protected Object decodeLast(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, DecodingState state) throws Exception {
// ignore, because can't send ACK after frame read
if (buffer.readable()) {
buffer.readBytes(super.actualReadableBytes());
}

checkpoint(DecodingState.PROTOCOL_VERSION);
return null;
}

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, DecodingState state) throws Exception {
ChannelBuffer[] events = null;
switch (state) {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> list) throws Exception {
switch (state()) {
case PROTOCOL_VERSION:
checkVersion(buffer);
checkpoint(DecodingState.FRAME_TYPE);
Expand All @@ -110,57 +96,49 @@ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffe
default:
throw new Exception("Unknown frame type: " + frameType);
}
return null;
return;
case FRAME_WINDOW_SIZE:
processWindowSizeFrame(buffer);
break;
case FRAME_DATA:
events = parseDataFrame(channel, buffer);
list.addAll(parseDataFrame(channelHandlerContext.channel(), buffer));
break;
case FRAME_COMPRESSED:
events = processCompressedFrame(channel, buffer);
list.addAll(processCompressedFrame(channelHandlerContext.channel(), buffer));
break;
case FRAME_JSON:
events = parseJsonFrame(channel, buffer);
list.addAll(parseJsonFrame(channelHandlerContext.channel(), buffer));
break;
default:
throw new Exception("Unknown decoding state: " + state);
throw new Exception("Unknown decoding state: " + state());
}

checkpoint(DecodingState.PROTOCOL_VERSION);
return events;
}

@Nullable
private ChannelBuffer[] processUncompressedBuffer(Channel channel, ChannelBuffer buffer) throws Exception {
private Collection<ByteBuf> processUncompressedBuffer(Channel channel, ByteBuf buffer) throws Exception {
checkVersion(buffer);
byte frameType = buffer.readByte();

ChannelBuffer[] events = null;
switch (frameType) {
case FRAME_WINDOW_SIZE:
processWindowSizeFrame(buffer);
break;
return Collections.emptyList();
case FRAME_DATA:
events = parseDataFrame(channel, buffer);
break;
return parseDataFrame(channel, buffer);
case FRAME_COMPRESSED:
events = processCompressedFrame(channel, buffer);
break;
return processCompressedFrame(channel, buffer);
case FRAME_JSON:
events = parseJsonFrame(channel, buffer);
break;
return parseJsonFrame(channel, buffer);
default:
throw new Exception("Unknown frame type: " + frameType);
}

return events;
}

private void checkVersion(ChannelBuffer channelBuffer) throws Exception {
private void checkVersion(ByteBuf channelBuffer) {
byte version = channelBuffer.readByte();
if (version != PROTOCOL_VERSION) {
throw new Exception("Unknown beats protocol version: " + version);
throw new IllegalStateException("Unknown beats protocol version: " + version);
}
}

Expand All @@ -169,68 +147,66 @@ private void checkVersion(ChannelBuffer channelBuffer) throws Exception {
*/
private void sendACK(Channel channel) throws IOException {
if (sequenceNum == windowSize) {
final ChannelBuffer buffer = ChannelBuffers.buffer(6);
final ByteBuf buffer = channel.alloc().buffer(6);
buffer.writeByte(PROTOCOL_VERSION);
buffer.writeByte(FRAME_ACK);
buffer.writeInt((int) sequenceNum);

LOG.trace("Sending ACK for sequence number {} on channel {}", sequenceNum, channel);
channel.write(buffer);
channel.writeAndFlush(buffer);
}
}

/**
* <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type">'json' frame type</a>
*/
private ChannelBuffer[] parseJsonFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private Collection<ByteBuf> parseJsonFrame(Channel channel, ByteBuf channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

final int jsonLength = Ints.saturatedCast(channelBuffer.readUnsignedInt());

final ChannelBuffer buffer = channelBuffer.readSlice(jsonLength);
final ByteBuf buffer = channelBuffer.readBytes(jsonLength);
sendACK(channel);

return new ChannelBuffer[]{buffer};
return Collections.singleton(buffer);
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type">'compressed' frame type</a>
*/
private ChannelBuffer[] processCompressedFrame(Channel channel, ChannelBuffer channelBuffer) throws Exception {
private Collection<ByteBuf> processCompressedFrame(Channel channel, ByteBuf channelBuffer) throws Exception {
final long payloadLength = channelBuffer.readUnsignedInt();
final byte[] data = new byte[(int) payloadLength];
channelBuffer.readBytes(data);
try (final ByteArrayInputStream dataStream = new ByteArrayInputStream(data);
final InputStream in = new InflaterInputStream(dataStream)) {
final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(ByteStreams.toByteArray(in));
final InputStream in = new InflaterInputStream(dataStream)) {
final ByteBuf buffer = Unpooled.wrappedBuffer(ByteStreams.toByteArray(in));
return processCompressedDataFrames(channel, buffer);
}
}

private ChannelBuffer[] processCompressedDataFrames(Channel channel, ChannelBuffer channelBuffer) throws Exception {
final List<ChannelBuffer> events = new ArrayList<>();
while (channelBuffer.readable()) {
final ChannelBuffer[] buffers = processUncompressedBuffer(channel, channelBuffer);
if (buffers != null) {
Iterables.addAll(events, Arrays.asList(buffers));
}
private Collection<ByteBuf> processCompressedDataFrames(Channel channel, ByteBuf channelBuffer) throws Exception {
final List<ByteBuf> events = new ArrayList<>();
while (channelBuffer.isReadable()) {
final Collection<ByteBuf> buffers = processUncompressedBuffer(channel, channelBuffer);
events.addAll(buffers);
}
return events.toArray(new ChannelBuffer[0]);
return events;
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type">'window size' frame type</a>
*/
private void processWindowSizeFrame(ChannelBuffer channelBuffer) {
private void processWindowSizeFrame(ByteBuf channelBuffer) {
windowSize = channelBuffer.readUnsignedInt();
LOG.trace("Changed window size to {}", windowSize);
}

/**
* @see <a href="https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type">'data' frame type</a>
*/
private ChannelBuffer[] parseDataFrame(Channel channel, ChannelBuffer channelBuffer) throws IOException {
private Collection<ByteBuf> parseDataFrame(Channel channel, ByteBuf channelBuffer) throws IOException {
sequenceNum = channelBuffer.readUnsignedInt();
LOG.trace("Received sequence number {}", sequenceNum);

Expand All @@ -247,17 +223,16 @@ private ChannelBuffer[] parseDataFrame(Channel channel, ChannelBuffer channelBuf
jg.writeEndObject();
}

final ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(outputStream.toByteArray());
final ByteBuf buffer = Unpooled.wrappedBuffer(outputStream.toByteArray());
sendACK(channel);

return new ChannelBuffer[]{buffer};
return Collections.singleton(buffer);
}

private String parseDataItem(ChannelBuffer channelBuffer) {
long length = channelBuffer.readUnsignedInt();
final byte[] bytes = new byte[(int) length];
channelBuffer.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
private String parseDataItem(ByteBuf buf) {
int length = Ints.saturatedCast(buf.readUnsignedInt());
final ByteBuf item = buf.readSlice(length);
return item.toString(StandardCharsets.UTF_8);
}

@VisibleForTesting
Expand Down
35 changes: 6 additions & 29 deletions src/main/java/org/graylog/plugins/beats/BeatsTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.graylog.plugins.beats;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.assistedinject.Assisted;
import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.configuration.ConfigurationRequest;
Expand All @@ -31,43 +31,20 @@
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ConnectionCounter;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.channel.ChannelHandler;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.codahale.metrics.MetricRegistry.name;

public class BeatsTransport extends AbstractTcpTransport {
@Inject
public BeatsTransport(@Assisted Configuration configuration,
EventLoopGroup eventLoopGroup,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
@Named("bossPool") Executor bossPool,
ConnectionCounter connectionCounter) {
this(configuration, throughputCounter, localRegistry, bossPool, executorService("beats-worker", "beats-transport-worker-%d", localRegistry), connectionCounter);
}

private BeatsTransport(Configuration configuration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
Executor bossPool,
Executor workerPool,
ConnectionCounter connectionCounter) {
super(configuration, throughputCounter, localRegistry, bossPool, workerPool, connectionCounter);
}

private static Executor executorService(final String executorName, final String threadNameFormat, final MetricRegistry metricRegistry) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build();
return new InstrumentedExecutorService(
Executors.newCachedThreadPool(threadFactory),
metricRegistry,
name(BeatsTransport.class, executorName, "executor-service"));
super(configuration, throughputCounter, localRegistry, eventLoopGroup, nettyTransportConfiguration, connectionCounter);
}

@Override
Expand Down
Loading

0 comments on commit 792c85c

Please sign in to comment.