Skip to content

Commit

Permalink
Fix reverse port issue pcap capture (netty#12081)
Browse files Browse the repository at this point in the history
Motivation:

While exploring how to introduce PCAP support into
Vert.x and Quarkus based on Netty's PcapWriteHandler,
I noticed that in server pipelines,
the capture has the source and destination ports
reversed for server pipelines.

This can be seen in the screenshot from eclipse-vertx/vert.x#4260 (comment), where a browser is requesting
data from a REST API built with Quarkus listening on port 8080.
In the screenshot you can see that initial HTTP request has the ports
reversed.

Modification:

This change makes sure that source and destination ports are always properly determined no matter what type of pipeline is being used

Result:

Correct port is used
  • Loading branch information
geoand authored Feb 7, 2022
1 parent 773ac50 commit 54a2c6d
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 46 deletions.
104 changes: 65 additions & 39 deletions handler/src/main/java/io/netty/handler/pcap/PcapWriteHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,16 @@ public final class PcapWriteHandler extends ChannelDuplexHandler implements Clos
private int receiveSegmentNumber = 1;

/**
* Source Address
* Address of the initiator of the connection
*/
private InetSocketAddress srcAddr;
private InetSocketAddress initiatiorAddr;

/**
* Destination Address
* Address of the receiver of the connection
*/
private InetSocketAddress dstAddr;
private InetSocketAddress handlerAddr;

private boolean isServerPipeline;

/**
* Set to {@code true} if {@link #close()} is called and we should stop writing Pcap.
Expand Down Expand Up @@ -180,11 +182,13 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {

// Capture correct `localAddress` and `remoteAddress`
if (ctx.channel().parent() instanceof ServerSocketChannel) {
srcAddr = (InetSocketAddress) ctx.channel().remoteAddress();
dstAddr = (InetSocketAddress) ctx.channel().localAddress();
isServerPipeline = true;
initiatiorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
handlerAddr = (InetSocketAddress) ctx.channel().localAddress();
} else {
srcAddr = (InetSocketAddress) ctx.channel().localAddress();
dstAddr = (InetSocketAddress) ctx.channel().remoteAddress();
isServerPipeline = false;
initiatiorAddr = (InetSocketAddress) ctx.channel().localAddress();
handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
}

logger.debug("Initiating Fake TCP 3-Way Handshake");
Expand All @@ -193,17 +197,19 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {

try {
// Write SYN with Normal Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, 0, 0, srcAddr.getPort(), dstAddr.getPort(), TCPPacket.TCPFlag.SYN);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
TCPPacket.writePacket(tcpBuf, null, 0, 0,
initiatiorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
completeTCPWrite(initiatiorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);

// Write SYN+ACK with Reversed Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, 0, 1, dstAddr.getPort(), srcAddr.getPort(), TCPPacket.TCPFlag.SYN,
TCPPacket.TCPFlag.ACK);
completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
TCPPacket.writePacket(tcpBuf, null, 0, 1,
handlerAddr.getPort(), initiatiorAddr.getPort(), TCPPacket.TCPFlag.SYN, TCPPacket.TCPFlag.ACK);
completeTCPWrite(handlerAddr, initiatiorAddr, tcpBuf, byteBufAllocator, ctx);

// Write ACK with Normal Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, 1, 1, srcAddr.getPort(), dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatiorAddr.getPort(),
handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(initiatiorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
} finally {
tcpBuf.release();
}
Expand All @@ -215,8 +221,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
// If `DatagramChannel` is connected then we can get
// `localAddress` and `remoteAddress` from Channel.
if (datagramChannel.isConnected()) {
srcAddr = (InetSocketAddress) ctx.channel().localAddress();
dstAddr = (InetSocketAddress) ctx.channel().remoteAddress();
initiatiorAddr = (InetSocketAddress) ctx.channel().localAddress();
handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
}
}

Expand Down Expand Up @@ -276,6 +282,16 @@ private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOpe

try {
if (isWriteOperation) {
final InetSocketAddress srcAddr;
final InetSocketAddress dstAddr;
if (isServerPipeline) {
srcAddr = handlerAddr;
dstAddr = initiatiorAddr;
} else {
srcAddr = initiatiorAddr;
dstAddr = handlerAddr;
}

TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
Expand All @@ -288,17 +304,27 @@ private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOpe
completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
} else {
TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, dstAddr, srcAddr, false);
final InetSocketAddress srcAddr;
final InetSocketAddress dstAddr;
if (isServerPipeline) {
srcAddr = initiatiorAddr;
dstAddr = handlerAddr;
} else {
srcAddr = handlerAddr;
dstAddr = initiatiorAddr;
}

TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber, srcAddr.getPort(),
dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);

receiveSegmentNumber += bytes;

TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, true);
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
}
} finally {
tcpBuf.release();
Expand Down Expand Up @@ -419,10 +445,10 @@ private void handleUDP(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = ((ByteBuf) msg).duplicate();

logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
byteBuf.readableBytes(), srcAddr, dstAddr);
byteBuf.readableBytes(), initiatiorAddr, handlerAddr);

UDPPacket.writePacket(udpBuf, byteBuf, srcAddr.getPort(), dstAddr.getPort());
completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
UDPPacket.writePacket(udpBuf, byteBuf, initiatiorAddr.getPort(), handlerAddr.getPort());
completeUDPWrite(initiatiorAddr, handlerAddr, udpBuf, ctx.alloc(), ctx);
} else {
logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
}
Expand Down Expand Up @@ -490,19 +516,19 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

try {
// Write FIN+ACK with Normal Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
dstAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatiorAddr.getPort(),
handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
completeTCPWrite(initiatiorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);

// Write FIN+ACK with Reversed Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
srcAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, handlerAddr.getPort(),
initiatiorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
completeTCPWrite(handlerAddr, initiatiorAddr, tcpBuf, byteBufAllocator, ctx);

// Write ACK with Normal Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber + 1, receiveSegmentNumber + 1,
srcAddr.getPort(), dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
initiatiorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
completeTCPWrite(initiatiorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
} finally {
tcpBuf.release();
}
Expand All @@ -522,9 +548,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E

try {
// Write RST with Normal Source and Destination Address
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
dstAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
completeTCPWrite(srcAddr, dstAddr, tcpBuf, ctx.alloc(), ctx);
TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatiorAddr.getPort(),
handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
completeTCPWrite(initiatiorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
} finally {
tcpBuf.release();
}
Expand Down
155 changes: 148 additions & 7 deletions handler/src/test/java/io/netty/handler/pcap/PcapWriteHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,34 @@
package io.netty.handler.pcap;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

import java.net.Inet4Address;
Expand Down Expand Up @@ -78,13 +93,7 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertTrue(eventLoopGroup.shutdownGracefully().sync().isSuccess());

// Verify Pcap Global Headers
assertEquals(0xa1b2c3d4, byteBuf.readInt()); // magic_number
assertEquals(2, byteBuf.readShort()); // version_major
assertEquals(4, byteBuf.readShort()); // version_minor
assertEquals(0, byteBuf.readInt()); // thiszone
assertEquals(0, byteBuf.readInt()); // sigfigs
assertEquals(0xffff, byteBuf.readInt()); // snaplen
assertEquals(1, byteBuf.readInt()); // network
verifyGlobalHeaders(byteBuf);

// Verify Pcap Packet Header
byteBuf.readInt(); // Just read, we don't care about timestamps for now
Expand Down Expand Up @@ -138,4 +147,136 @@ protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) {
assertTrue(ipv4Packet.release());
assertTrue(udpPacket.release());
}

@Test
public void tcpV4() throws InterruptedException, ExecutionException {
final ByteBuf byteBuf = Unpooled.buffer();

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup clientGroup = new NioEventLoopGroup();

// Configure the echo server
ServerBootstrap sb = new ServerBootstrap();
final Promise<Boolean> dataReadPromise = bossGroup.next().newPromise();
sb.group(bossGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new PcapWriteHandler(new ByteBufOutputStream(byteBuf)));
p.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
dataReadPromise.setSuccess(true);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});

// Start the server.
ChannelFuture serverChannelFuture = sb.bind(new InetSocketAddress("127.0.0.1", 0)).sync();
assertTrue(serverChannelFuture.isSuccess());

// configure the client
Bootstrap cb = new Bootstrap();
final Promise<Boolean> dataWrittenPromise = clientGroup.next().newPromise();
cb.group(clientGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.wrappedBuffer("Meow".getBytes()));
dataWrittenPromise.setSuccess(true);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
});
}
});

// Start the client.
ChannelFuture clientChannelFuture = cb.connect(serverChannelFuture.channel().localAddress()).sync();
assertTrue(clientChannelFuture.isSuccess());

assertTrue(dataWrittenPromise.await(5, TimeUnit.SECONDS));
assertTrue(dataReadPromise.await(5, TimeUnit.SECONDS));

clientChannelFuture.channel().close().sync();
serverChannelFuture.channel().close().sync();

// Shut down all event loops to terminate all threads.
assertTrue(clientGroup.shutdownGracefully().sync().isSuccess());
assertTrue(bossGroup.shutdownGracefully().sync().isSuccess());

verifyGlobalHeaders(byteBuf);

// Verify Pcap Packet Header
byteBuf.readInt(); // Just read, we don't care about timestamps for now
byteBuf.readInt(); // Just read, we don't care about timestamps for now
assertEquals(54, byteBuf.readInt()); // Length of Packet Saved In Pcap
assertEquals(54, byteBuf.readInt()); // Actual Length of Packet

// -------------------------------------------- Verify Packet --------------------------------------------
// Verify Ethernet Packet
ByteBuf ethernetPacket = byteBuf.readSlice(54);
ByteBuf dstMac = ethernetPacket.readSlice(6);
ByteBuf srcMac = ethernetPacket.readSlice(6);
assertArrayEquals(new byte[]{0, 0, 94, 0, 83, -1}, ByteBufUtil.getBytes(dstMac));
assertArrayEquals(new byte[]{0, 0, 94, 0, 83, 0}, ByteBufUtil.getBytes(srcMac));
assertEquals(0x0800, ethernetPacket.readShort());

// Verify IPv4 Packet
ByteBuf ipv4Packet = ethernetPacket.readSlice(32);
assertEquals(0x45, ipv4Packet.readByte()); // Version + IHL
assertEquals(0x00, ipv4Packet.readByte()); // DSCP
assertEquals(40, ipv4Packet.readShort()); // Length
assertEquals(0x0000, ipv4Packet.readShort()); // Identification
assertEquals(0x0000, ipv4Packet.readShort()); // Fragment
assertEquals((byte) 0xff, ipv4Packet.readByte()); // TTL
assertEquals((byte) 6, ipv4Packet.readByte()); // Protocol
assertEquals(0, ipv4Packet.readShort()); // Checksum
InetSocketAddress serverAddr = (InetSocketAddress) serverChannelFuture.channel().localAddress();
// Source IPv4 Address
assertEquals(NetUtil.ipv4AddressToInt((Inet4Address) serverAddr.getAddress()), ipv4Packet.readInt());
// Destination IPv4 Address
ipv4Packet.readInt();

InetSocketAddress clientAddr = (InetSocketAddress) clientChannelFuture.channel().localAddress();

// Verify ports
ByteBuf tcpPacket = ipv4Packet.readSlice(12);
assertEquals(clientAddr.getPort() & 0xffff, tcpPacket.readUnsignedShort()); // Source Port
assertEquals(serverAddr.getPort() & 0xffff, tcpPacket.readUnsignedShort()); // Destination Port
}

private static void verifyGlobalHeaders(ByteBuf byteBuf) {
assertEquals(0xa1b2c3d4, byteBuf.readInt()); // magic_number
assertEquals(2, byteBuf.readShort()); // version_major
assertEquals(4, byteBuf.readShort()); // version_minor
assertEquals(0, byteBuf.readInt()); // thiszone
assertEquals(0, byteBuf.readInt()); // sigfigs
assertEquals(0xffff, byteBuf.readInt()); // snaplen
assertEquals(1, byteBuf.readInt()); // network
}
}

0 comments on commit 54a2c6d

Please sign in to comment.