Skip to content

Commit

Permalink
Fix missing "source" field for UDP transport based messages (#5512) (#…
Browse files Browse the repository at this point in the history
…5519)

This fixes an issue that has been introduced with the big Netty 4.1
update in PR #4397.

In Netty 3.x we always passed a "MessageEvent" through the channel
pipeline and got the remote address from that object. Since this object
doesn't exist anymore in Netty 4.x, we only pass the message payload's
"ByteBuf" through the pipeline and rely on the "Channel#getRemoteAddress"
method to always return the remote address object.

The problem is that this does indeed work for TCP based channels but it
doesn't work for UDP based channels. For UDP channels the
"#getRemoteAddress()" method always returns "null".

This is probably due to the connection-less nature of UDP.
For UDP transports Netty only creates a single channel. For TCP transports
there is one channel per TCP connection

To fix this we need to get our hands on the remote address when we
create the "RawMessage" object at the very end of the Netty pipeline.

Since we only pass the message payload "ByteBuf" through the Netty
pipeline, we could previously reuse several classes for TCP and UDP
transports because they were basically the same.

For UDP transports we now need to carry the remote address through the
pipeline by using a "AddressedEnvelope" (available in Netty) that takes
a payload and a sender/receiver object.

That means we have to create a few UDP specific - or rather
"AddressedEnvelope" specific - pipeline handlers because the shared ones
only know how to handle "ByteBuf" messages.

This PR moves some shared code out of the "NettyTransport" class up to
"AbstractTcpTransport" and "UdpTransport" so we can customize the
pipeline for the two different payload types. It also creates new
message aggregation handlers for the "AddressedEnvelope" objects.

In the future we can probably refactor this to share some more code, but
for 3.0 I tried to change as few as possible. The TCP pipeline is
basically unchanged apart from the "AbstractTcpTransport" change.

Fixes #5264
Fixes #5293

**Note:** This needs to be cherry-picked into 3.0

(cherry picked from commit 375e618)
  • Loading branch information
bernd authored and kroepke committed Jan 16, 2019
1 parent dc772a5 commit 8868151
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import org.graylog.plugins.netflow.codecs.RemoteAddressCodecAggregator;
import org.graylog2.inputs.transports.netty.SenderEnvelope;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

public class NetflowMessageAggregationHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Logger LOG = LoggerFactory.getLogger(NetflowMessageAggregationHandler.class);
Expand All @@ -46,15 +46,15 @@ public NetflowMessageAggregationHandler(RemoteAddressCodecAggregator aggregator,

@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
final SocketAddress remoteAddress = msg.sender();
final InetSocketAddress remoteAddress = msg.sender();
final CodecAggregator.Result result;
try (Timer.Context ignored = aggregationTimer.time()) {
result = aggregator.addChunk(msg.content(), remoteAddress);
}
final ByteBuf completeMessage = result.getMessage();
if (completeMessage != null) {
LOG.debug("Message aggregation completion, forwarding {}", completeMessage);
ctx.fireChannelRead(completeMessage);
ctx.fireChannelRead(SenderEnvelope.of(completeMessage, remoteAddress));
} else if (result.isValid()) {
LOG.debug("More chunks necessary to complete this message");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.graylog2.inputs.transports;

import com.codahale.metrics.MetricRegistry;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
Expand All @@ -37,6 +36,8 @@
import io.netty.util.concurrent.GlobalEventExecutor;
import org.graylog2.inputs.transports.netty.DatagramChannelFactory;
import org.graylog2.inputs.transports.netty.DatagramPacketHandler;
import org.graylog2.inputs.transports.netty.EnvelopeMessageAggregationHandler;
import org.graylog2.inputs.transports.netty.EnvelopeMessageHandler;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.NettyTransportType;
import org.graylog2.plugin.LocalMetricRegistry;
Expand All @@ -46,6 +47,7 @@
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.annotations.FactoryClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.plugin.inputs.transports.Transport;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
Expand Down Expand Up @@ -103,6 +105,19 @@ protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChannelHa
return handlers;
}

protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChildChannelHandlers(final MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlerList = new LinkedHashMap<>(getCustomChildChannelHandlers(input));

final CodecAggregator aggregator = getAggregator();
if (aggregator != null) {
LOG.debug("Adding codec aggregator {} to channel pipeline", aggregator);
handlerList.put("codec-aggregator", () -> new EnvelopeMessageAggregationHandler(aggregator, localRegistry));
}
handlerList.put("envelope-message-handler", () -> new EnvelopeMessageHandler(input));

return handlerList;
}

@Override
public void launch(final MessageInput input) throws MisfireException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageAggregationHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);
public class ByteBufMessageAggregationHandler extends SimpleChannelInboundHandler<ByteBuf> {
private static final Logger LOG = LoggerFactory.getLogger(ByteBufMessageAggregationHandler.class);

private final CodecAggregator aggregator;
private final Timer aggregationTimer;
private final Meter invalidChunksMeter;

public MessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) {
public ByteBufMessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) {
this.aggregator = aggregator;
aggregationTimer = metricRegistry.timer("aggregationTime");
invalidChunksMeter = metricRegistry.meter("invalidMessages");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.graylog2.inputs.transports.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket;
Expand All @@ -31,7 +30,6 @@ public class DatagramPacketHandler extends MessageToMessageDecoder<DatagramPacke

@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception {
final ByteBuf content = msg.content();
out.add(ReferenceCountUtil.retain(content));
out.add(ReferenceCountUtil.retain(SenderEnvelope.of(msg.content(), msg.sender())));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class EnvelopeMessageAggregationHandler extends SimpleChannelInboundHandler<AddressedEnvelope<ByteBuf, InetSocketAddress>> {
private static final Logger LOG = LoggerFactory.getLogger(EnvelopeMessageAggregationHandler.class);

private final CodecAggregator aggregator;
private final Timer aggregationTimer;
private final Meter invalidChunksMeter;

public EnvelopeMessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) {
this.aggregator = aggregator;
aggregationTimer = metricRegistry.timer("aggregationTime");
invalidChunksMeter = metricRegistry.meter("invalidMessages");
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<ByteBuf, InetSocketAddress> envelope) throws Exception {
final CodecAggregator.Result result;
try (Timer.Context ignored = aggregationTimer.time()) {
result = aggregator.addChunk(envelope.content());
}
final ByteBuf completeMessage = result.getMessage();
if (completeMessage != null) {
LOG.debug("Message aggregation completion, forwarding {}", completeMessage);
ctx.fireChannelRead(SenderEnvelope.of(completeMessage, envelope.sender()));
} else if (result.isValid()) {
LOG.debug("More chunks necessary to complete this message");
} else {
invalidChunksMeter.mark();
LOG.debug("Message chunk was not valid and discarded.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.transports.NettyTransport;
import org.graylog2.plugin.journal.RawMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class EnvelopeMessageHandler extends SimpleChannelInboundHandler<AddressedEnvelope<ByteBuf, InetSocketAddress>> {
private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);

private final MessageInput input;

public EnvelopeMessageHandler(MessageInput input) {
this.input = input;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<ByteBuf, InetSocketAddress> envelope) throws Exception {
final ByteBuf msg = envelope.content();
final byte[] bytes = new byte[msg.readableBytes()];
msg.readBytes(bytes);
final RawMessage raw = new RawMessage(bytes, envelope.sender());
input.processRawMessage(raw);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.debug("Could not handle message, closing connection: {}", cause);
ctx.channel().close();
super.exceptionCaught(ctx, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* This file is part of Graylog.
*
* Graylog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog2.inputs.transports.netty;

import io.netty.channel.AddressedEnvelope;
import io.netty.channel.DefaultAddressedEnvelope;

import java.net.InetSocketAddress;

/**
* Helper class to simplify envelope creation.
*/
public class SenderEnvelope {
/**
* Returns a {@link AddressedEnvelope} of the given message and message sender values.
*
* @param message the message
* @param sender the sender address
* @param <M> message type
* @param <A> sender type
* @return the envelope
*/
public static <M, A extends InetSocketAddress> AddressedEnvelope<M, A> of(M message, A sender) {
return new DefaultAddressedEnvelope<>(message, null, sender);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.graylog2.plugin.inputs.transports;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.netty.bootstrap.ServerBootstrap;
Expand All @@ -44,6 +43,8 @@
import org.graylog2.inputs.transports.netty.ChannelRegistrationHandler;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.ExceptionLoggingChannelHandler;
import org.graylog2.inputs.transports.netty.ByteBufMessageAggregationHandler;
import org.graylog2.inputs.transports.netty.RawMessageHandler;
import org.graylog2.inputs.transports.netty.ServerSocketChannelFactory;
import org.graylog2.plugin.LocalMetricRegistry;
import org.graylog2.plugin.configuration.Configuration;
Expand All @@ -55,6 +56,7 @@
import org.graylog2.plugin.inputs.MessageInput;
import org.graylog2.plugin.inputs.MisfireException;
import org.graylog2.plugin.inputs.annotations.ConfigClass;
import org.graylog2.plugin.inputs.codecs.CodecAggregator;
import org.graylog2.plugin.inputs.transports.util.KeyUtil;
import org.graylog2.plugin.inputs.util.ConnectionCounter;
import org.graylog2.plugin.inputs.util.ThroughputCounter;
Expand All @@ -75,7 +77,6 @@
import java.util.EnumSet;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -227,6 +228,7 @@ public void stop() {
@Override
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChildChannelHandlers(MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>();
final CodecAggregator aggregator = getAggregator();

handlers.put("channel-registration", () -> new ChannelRegistrationHandler(childChannels));
handlers.put("traffic-counter", () -> throughputCounter);
Expand All @@ -235,7 +237,12 @@ protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChildChan
LOG.info("Enabled TLS for input [{}/{}]. key-file=\"{}\" cert-file=\"{}\"", input.getName(), input.getId(), tlsKeyFile, tlsCertFile);
handlers.put("tls", getSslHandlerCallable(input));
}
handlers.putAll(super.getChildChannelHandlers(input));
handlers.putAll(getCustomChildChannelHandlers(input));
if (aggregator != null) {
LOG.debug("Adding codec aggregator {} to channel pipeline", aggregator);
handlers.put("codec-aggregator", () -> new ByteBufMessageAggregationHandler(aggregator, localRegistry));
}
handlers.put("rawmessage-handler", () -> new RawMessageHandler(input));
handlers.put("exception-logger", () -> new ExceptionLoggingChannelHandler(input, LOG));

return handlers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.netty.channel.ChannelPipeline;
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory;
import org.graylog2.inputs.transports.netty.ExceptionLoggingChannelHandler;
import org.graylog2.inputs.transports.netty.MessageAggregationHandler;
import org.graylog2.inputs.transports.netty.PromiseFailureHandler;
import org.graylog2.inputs.transports.netty.RawMessageHandler;
import org.graylog2.plugin.LocalMetricRegistry;
Expand Down Expand Up @@ -176,17 +175,7 @@ protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getCustomChi
* @return list of custom {@link ChannelHandler channel handlers} to add to the Netty {@link ChannelPipeline channel pipeline} for child channels
* @see #getCustomChildChannelHandlers(MessageInput)
*/
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getChildChannelHandlers(final MessageInput input) {
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlerList = new LinkedHashMap<>(getCustomChildChannelHandlers(input));

if (aggregator != null) {
log.debug("Adding codec aggregator {} to channel pipeline", aggregator);
handlerList.put("codec-aggregator", () -> new MessageAggregationHandler(aggregator, localRegistry));
}
handlerList.put("rawmessage-handler", () -> new RawMessageHandler(input));

return handlerList;
}
protected abstract LinkedHashMap<String, Callable<? extends ChannelHandler>> getChildChannelHandlers(final MessageInput input);

protected int getRecvBufferSize() {
return recvBufferSize;
Expand Down

0 comments on commit 8868151

Please sign in to comment.