-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix missing "source" field for UDP transport based messages
This fixes an issue that has been introduced with the big Netty 4.1 update in PR #4397. In Netty 3.x we always passed a "MessageEvent" through the channel pipeline and got the remote address from that object. Since this object doesn't exist anymore in Netty 4.x, we only pass the message payload's "ByteBuf" through the pipeline and rely on the "Channel#getRemoteAddress" method to always return the remote address object. The problem is that this does indeed work for TCP based channels but it doesn't work for UDP based channels. For UDP channels the "#getRemoteAddress()" method always returns "null". This is probably due to the connection-less nature of UDP. For UDP transports Netty only creates a single channel. For TCP transports there is one channel per TCP connection To fix this we need to get our hands on the remote address when we create the "RawMessage" object at the very end of the Netty pipeline. Since we only pass the message payload "ByteBuf" through the Netty pipeline, we could previously reuse several classes for TCP and UDP transports because they were basically the same. For UDP transports we now need to carry the remote address through the pipeline by using a "AddressedEnvelope" (available in Netty) that takes a payload and a sender/receiver object. That means we have to create a few UDP specific - or rather "AddressedEnvelope" specific - pipeline handlers because the shared ones only know how to handle "ByteBuf" messages. This PR moves some shared code out of the "NettyTransport" class up to "AbstractTcpTransport" and "UdpTransport" so we can customize the pipeline for the two different payload types. It also creates new message aggregation handlers for the "AddressedEnvelope" objects. In the future we can probably refactor this to share some more code, but for 3.0 I tried to change as few as possible. The TCP pipeline is basically unchanged apart from the "AbstractTcpTransport" change. Fixes #5264 Fixes #5293
- Loading branch information
Showing
9 changed files
with
191 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
62 changes: 62 additions & 0 deletions
62
...src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageAggregationHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/** | ||
* This file is part of Graylog. | ||
* | ||
* Graylog is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.transports.netty; | ||
|
||
import com.codahale.metrics.Meter; | ||
import com.codahale.metrics.MetricRegistry; | ||
import com.codahale.metrics.Timer; | ||
import io.netty.buffer.ByteBuf; | ||
import io.netty.channel.AddressedEnvelope; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import org.graylog2.plugin.inputs.codecs.CodecAggregator; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.net.InetSocketAddress; | ||
|
||
public class EnvelopeMessageAggregationHandler extends SimpleChannelInboundHandler<AddressedEnvelope<ByteBuf, InetSocketAddress>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(EnvelopeMessageAggregationHandler.class); | ||
|
||
private final CodecAggregator aggregator; | ||
private final Timer aggregationTimer; | ||
private final Meter invalidChunksMeter; | ||
|
||
public EnvelopeMessageAggregationHandler(CodecAggregator aggregator, MetricRegistry metricRegistry) { | ||
this.aggregator = aggregator; | ||
aggregationTimer = metricRegistry.timer("aggregationTime"); | ||
invalidChunksMeter = metricRegistry.meter("invalidMessages"); | ||
} | ||
|
||
@Override | ||
protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<ByteBuf, InetSocketAddress> envelope) throws Exception { | ||
final CodecAggregator.Result result; | ||
try (Timer.Context ignored = aggregationTimer.time()) { | ||
result = aggregator.addChunk(envelope.content()); | ||
} | ||
final ByteBuf completeMessage = result.getMessage(); | ||
if (completeMessage != null) { | ||
LOG.debug("Message aggregation completion, forwarding {}", completeMessage); | ||
ctx.fireChannelRead(SenderEnvelope.of(completeMessage, envelope.sender())); | ||
} else if (result.isValid()) { | ||
LOG.debug("More chunks necessary to complete this message"); | ||
} else { | ||
invalidChunksMeter.mark(); | ||
LOG.debug("Message chunk was not valid and discarded."); | ||
} | ||
} | ||
} |
55 changes: 55 additions & 0 deletions
55
...og2-server/src/main/java/org/graylog2/inputs/transports/netty/EnvelopeMessageHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
/** | ||
* This file is part of Graylog. | ||
* | ||
* Graylog is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.transports.netty; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
import io.netty.channel.AddressedEnvelope; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import org.graylog2.plugin.inputs.MessageInput; | ||
import org.graylog2.plugin.inputs.transports.NettyTransport; | ||
import org.graylog2.plugin.journal.RawMessage; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.net.InetSocketAddress; | ||
|
||
public class EnvelopeMessageHandler extends SimpleChannelInboundHandler<AddressedEnvelope<ByteBuf, InetSocketAddress>> { | ||
private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class); | ||
|
||
private final MessageInput input; | ||
|
||
public EnvelopeMessageHandler(MessageInput input) { | ||
this.input = input; | ||
} | ||
|
||
@Override | ||
protected void channelRead0(ChannelHandlerContext ctx, AddressedEnvelope<ByteBuf, InetSocketAddress> envelope) throws Exception { | ||
final ByteBuf msg = envelope.content(); | ||
final byte[] bytes = new byte[msg.readableBytes()]; | ||
msg.readBytes(bytes); | ||
final RawMessage raw = new RawMessage(bytes, envelope.sender()); | ||
input.processRawMessage(raw); | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | ||
LOG.debug("Could not handle message, closing connection: {}", cause); | ||
ctx.channel().close(); | ||
super.exceptionCaught(ctx, cause); | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/SenderEnvelope.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/** | ||
* This file is part of Graylog. | ||
* | ||
* Graylog is free software: you can redistribute it and/or modify | ||
* it under the terms of the GNU General Public License as published by | ||
* the Free Software Foundation, either version 3 of the License, or | ||
* (at your option) any later version. | ||
* | ||
* Graylog is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* GNU General Public License for more details. | ||
* | ||
* You should have received a copy of the GNU General Public License | ||
* along with Graylog. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
package org.graylog2.inputs.transports.netty; | ||
|
||
import io.netty.channel.AddressedEnvelope; | ||
import io.netty.channel.DefaultAddressedEnvelope; | ||
|
||
import java.net.InetSocketAddress; | ||
|
||
/** | ||
* Helper class to simplify envelope creation. | ||
*/ | ||
public class SenderEnvelope { | ||
/** | ||
* Returns a {@link AddressedEnvelope} of the given message and message sender values. | ||
* | ||
* @param message the message | ||
* @param sender the sender address | ||
* @param <M> message type | ||
* @param <A> sender type | ||
* @return the envelope | ||
*/ | ||
public static <M, A extends InetSocketAddress> AddressedEnvelope<M, A> of(M message, A sender) { | ||
return new DefaultAddressedEnvelope<>(message, null, sender); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters