Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Migrate to Netty 4.1 #28

Merged
merged 2 commits into from
Jan 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
Expand Down Expand Up @@ -248,7 +254,7 @@
<configuration>
<systemProperties>
<property>
<name>io.netty.leakDetectionLevel</name>
<name>io.netty.leakDetection.level</name>
<value>PARANOID</value>
</property>
</systemProperties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ public class NetFlowCodec extends AbstractCodec implements MultiMessageCodec {
static final String CK_NETFLOW9_DEFINITION_PATH = "netflow9_definitions_Path";
private static final Logger LOG = LoggerFactory.getLogger(NetFlowCodec.class);
private final NetFlowV9FieldTypeRegistry typeRegistry;
private final NetflowV9CodecAggregator netflowV9CodecAggregator;

@Inject
protected NetFlowCodec(@Assisted Configuration configuration) throws IOException {
protected NetFlowCodec(@Assisted Configuration configuration, NetflowV9CodecAggregator netflowV9CodecAggregator) throws IOException {
super(configuration);
this.netflowV9CodecAggregator = netflowV9CodecAggregator;

final String netFlow9DefinitionsPath = configuration.getString(CK_NETFLOW9_DEFINITION_PATH);
if (netFlow9DefinitionsPath == null || netFlow9DefinitionsPath.trim().isEmpty()) {
Expand All @@ -97,8 +99,7 @@ protected NetFlowCodec(@Assisted Configuration configuration) throws IOException
@Nullable
@Override
public CodecAggregator getAggregator() {
// this is intentional: we replace the entire channel handler in NetFlowUdpTransport because we need a different signature
return null;
return netflowV9CodecAggregator;
}

@Nullable
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package org.graylog.plugins.netflow.codecs;

import io.netty.buffer.ByteBuf;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.jboss.netty.buffer.ChannelBuffer;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -26,10 +26,10 @@ public interface RemoteAddressCodecAggregator extends CodecAggregator {

@Nonnull
@Override
default Result addChunk(ChannelBuffer buf) {
default Result addChunk(ByteBuf buf) {
return addChunk(buf, null);
}

@Nonnull
Result addChunk(ChannelBuffer buf, @Nullable SocketAddress remoteAddress);
Result addChunk(ByteBuf buf, @Nullable SocketAddress remoteAddress);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,62 +15,52 @@
*/
package org.graylog.plugins.netflow.transport;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.inject.assistedinject.Assisted;
import org.graylog.plugins.netflow.codecs.NetflowV9CodecAggregator;
import io.netty.channel.ChannelHandler;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.inputs.transports.NettyTransportConfiguration;
import org.graylog2.inputs.transports.UdpTransport;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.net.SocketAddress;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;

import static org.jboss.netty.channel.Channels.fireMessageReceived;

/**
* This UDP transport is largely identical to its superclass, but replaces the codec aggregator and its handler with custom
* implementations that are able to pass the remote address.
* <br/>
*
* Without the remote address the NetFlow V9 code cannot distinguish between flows from different exporters and thus might
* handle template flows incorrectly should they differ between exporters. See https://tools.ietf.org/html/rfc3954#section-5.1 "Source ID".
* handle template flows incorrectly should they differ between exporters.
*
* @see <a href="https://tools.ietf.org/html/rfc3954#section-5.1">RFC 3953 - Source ID</a>
*/
public class NetFlowUdpTransport extends UdpTransport {
private static final Logger LOG = LoggerFactory.getLogger(NetFlowUdpTransport.class);

private final NetflowV9CodecAggregator netflowV9CodecAggregator;

@Inject
public NetFlowUdpTransport(@Assisted Configuration configuration,
EventLoopGroupFactory eventLoopGroupFactory,
NettyTransportConfiguration nettyTransportConfiguration,
ThroughputCounter throughputCounter,
LocalMetricRegistry localRegistry,
NetflowV9CodecAggregator netflowV9CodecAggregator) {
super(configuration, throughputCounter, localRegistry);
this.netflowV9CodecAggregator = netflowV9CodecAggregator;
LocalMetricRegistry localRegistry) {
super(configuration, eventLoopGroupFactory, nettyTransportConfiguration, throughputCounter, localRegistry);
}

@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getFinalChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> finalChannelHandlers = super.getFinalChannelHandlers(input);
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>();
// replace the codec-aggregator handler with one that passes the remote address
handlers.put("codec-aggregator", () -> new NetflowMessageAggregationHandler(netflowV9CodecAggregator));
handlers.putAll(finalChannelHandlers);
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>(super.getChannelHandlers(input));

// Replace the default "codec-aggregator" handler with one that passes the remote address
final RemoteAddressCodecAggregator aggregator = (RemoteAddressCodecAggregator) getAggregator();
handlers.replace("codec-aggregator", () -> new NetflowMessageAggregationHandler(aggregator, localRegistry));
handlers.remove("udp-datagram");

return handlers;
}

Expand All @@ -80,50 +70,10 @@ public interface Factory extends Transport.Factory<NetFlowUdpTransport> {
NetFlowUdpTransport create(Configuration configuration);

@Override
UdpTransport.Config getConfig();
NetFlowUdpTransport.Config getConfig();
}

@ConfigClass
public static class Config extends UdpTransport.Config {
}

private class NetflowMessageAggregationHandler extends SimpleChannelHandler {
private final NetflowV9CodecAggregator aggregator;
private final Timer aggregationTimer;
private final Meter invalidChunksMeter;

public NetflowMessageAggregationHandler(NetflowV9CodecAggregator aggregator) {
this.aggregator = aggregator;
aggregationTimer = localRegistry.timer("aggregationTime");
invalidChunksMeter = localRegistry.meter("invalidMessages");
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Object message = e.getMessage();

final SocketAddress remoteAddress = e.getRemoteAddress();
if (message instanceof ChannelBuffer) {
final ChannelBuffer buf = (ChannelBuffer) message;
final CodecAggregator.Result result;
try (Timer.Context ignored = aggregationTimer.time()) {
result = aggregator.addChunk(buf, remoteAddress);
}
final ChannelBuffer completeMessage = result.getMessage();
if (completeMessage != null) {
LOG.debug("Message aggregation completion, forwarding {}", completeMessage);
fireMessageReceived(ctx, completeMessage, remoteAddress);
} else if (result.isValid()) {
LOG.debug("More chunks necessary to complete this message");
} else {
invalidChunksMeter.mark();
LOG.debug("Message chunk was not valid and discarded.");
}
} else {
LOG.debug("Could not handle netty message {}, sending further upstream.", e);
fireMessageReceived(ctx, message, remoteAddress);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2017 Graylog Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.graylog.plugins.netflow.transport;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class NetflowMessageAggregationHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Logger LOG = LoggerFactory.getLogger(NetflowMessageAggregationHandler.class);

private final RemoteAddressCodecAggregator aggregator;
private final Timer aggregationTimer;
private final Meter invalidChunksMeter;

public NetflowMessageAggregationHandler(RemoteAddressCodecAggregator aggregator, MetricRegistry metricRegistry) {
this.aggregator = aggregator;
aggregationTimer = metricRegistry.timer("aggregationTime");
invalidChunksMeter = metricRegistry.meter("invalidMessages");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
final SocketAddress remoteAddress = msg.sender();
final CodecAggregator.Result result;
try (Timer.Context ignored = aggregationTimer.time()) {
result = aggregator.addChunk(msg.content(), remoteAddress);
}
final ByteBuf completeMessage = result.getMessage();
if (completeMessage != null) {
LOG.debug("Message aggregation completion, forwarding {}", completeMessage);
ctx.fireChannelRead(completeMessage);
} else if (result.isValid()) {
LOG.debug("More chunks necessary to complete this message");
} else {
invalidChunksMeter.mark();
LOG.debug("Message chunk was not valid and discarded.");
}
}
}
53 changes: 29 additions & 24 deletions src/main/java/org/graylog/plugins/netflow/v9/NetFlowV9Parser.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/*
* Copyright 2013 Eediom Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -20,6 +20,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import org.graylog.plugins.netflow.flows.EmptyTemplateException;
import org.graylog.plugins.netflow.flows.InvalidFlowVersionException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -157,8 +158,8 @@ public static NetFlowV9Template parseTemplate(ByteBuf bb, NetFlowV9FieldTypeRegi
/**
* Like above, but only retrieves the bytes and template ids
*/
public static List<Map.Entry<Integer, ByteBuf>> parseTemplatesShallow(ByteBuf bb) {
final ImmutableList.Builder<Map.Entry<Integer, ByteBuf>> templates = ImmutableList.builder();
public static List<Map.Entry<Integer, byte[]>> parseTemplatesShallow(ByteBuf bb) {
final ImmutableList.Builder<Map.Entry<Integer, byte[]>> templates = ImmutableList.builder();
int len = bb.readUnsignedShort();

int p = 4; // flow set id and length field itself
Expand All @@ -168,10 +169,12 @@ public static List<Map.Entry<Integer, ByteBuf>> parseTemplatesShallow(ByteBuf bb
final int fieldCount = bb.readUnsignedShort();
final ImmutableList.Builder<NetFlowV9FieldDef> fieldDefs = ImmutableList.builder();
for (int i = 0; i < fieldCount; i++) {
/* int fieldType = */ bb.readUnsignedShort();
/* int fieldLength = */ bb.readUnsignedShort();
int fieldType = bb.readUnsignedShort();
int fieldLength = bb.readUnsignedShort();
}
templates.add(Maps.immutableEntry(templateId, bb.copy(start, bb.readerIndex() - start)));
final byte[] bytes = ByteBufUtil.getBytes(bb, start, bb.readerIndex() - start);
final Map.Entry<Integer, byte[]> template = Maps.immutableEntry(templateId, bytes);
templates.add(template);

p += 4 + fieldCount * 4;
}
Expand Down Expand Up @@ -240,7 +243,7 @@ public static NetFlowV9OptionTemplate parseOptionTemplate(ByteBuf bb, NetFlowV9F
return NetFlowV9OptionTemplate.create(templateId, scopeDefs.build(), optionDefs.build());
}

public static Map.Entry<Integer, ByteBuf> parseOptionTemplateShallow(ByteBuf bb) {
public static Map.Entry<Integer, byte[]> parseOptionTemplateShallow(ByteBuf bb) {
final int start = bb.readerIndex();
int length = bb.readUnsignedShort();
final int templateId = bb.readUnsignedShort();
Expand Down Expand Up @@ -269,7 +272,8 @@ public static Map.Entry<Integer, ByteBuf> parseOptionTemplateShallow(ByteBuf bb)
// skip padding
bb.readerIndex(endOfTemplate);

return Maps.immutableEntry(templateId, bb.copy(start, bb.readerIndex() - start));
final byte[] bytes = ByteBufUtil.getBytes(bb, start, bb.readerIndex() - start);
return Maps.immutableEntry(templateId, bytes);
}

/**
Expand Down Expand Up @@ -354,29 +358,30 @@ public static Integer parseRecordShallow(ByteBuf bb) {


public static RawNetFlowV9Packet parsePacketShallow(ByteBuf bb) {
final int dataLength = bb.readableBytes();
final NetFlowV9Header header = parseHeader(bb);
final ByteBuf buf = bb.duplicate();

Map<Integer, ByteBuf> allTemplates = Maps.newHashMap();
Set<Integer> usedTemplates = Sets.newHashSet();
Map.Entry<Integer, ByteBuf> optTemplate = null;
while (bb.isReadable()) {
bb.markReaderIndex();
int flowSetId = bb.readUnsignedShort();
final int dataLength = buf.readableBytes();
final NetFlowV9Header header = parseHeader(buf);
final Map<Integer, byte[]> allTemplates = Maps.newHashMap();
Map.Entry<Integer, byte[]> optTemplate = null;
final Set<Integer> usedTemplates = Sets.newHashSet();

while (buf.isReadable()) {
buf.markReaderIndex();
int flowSetId = buf.readUnsignedShort();
if (flowSetId == 0) {
final List<Map.Entry<Integer, ByteBuf>> templates = parseTemplatesShallow(bb);
for (Map.Entry<Integer, ByteBuf> t : templates) {
final List<Map.Entry<Integer, byte[]>> templates = parseTemplatesShallow(buf);
for (Map.Entry<Integer, byte[]> t : templates) {
allTemplates.put(t.getKey(), t.getValue());
}
} else if (flowSetId == 1) {
optTemplate = parseOptionTemplateShallow(bb);
optTemplate = parseOptionTemplateShallow(buf);
} else {
bb.resetReaderIndex();
usedTemplates.add(parseRecordShallow(bb));
buf.resetReaderIndex();
usedTemplates.add(parseRecordShallow(buf));
}
}

return RawNetFlowV9Packet.create(header, dataLength, allTemplates, optTemplate, usedTemplates);
}

}
Loading