Skip to content

Commit

Permalink
Merge pull request #119 from gregschohn/StreamingTrafficReplay
Browse files Browse the repository at this point in the history
Allow reading from stdin & writing to stdout.  Also pulled log4j2 and beust.jcommander
  • Loading branch information
gregschohn authored Mar 28, 2023
2 parents 6836eed + 0f03e13 commit f792138
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 107 deletions.
13 changes: 13 additions & 0 deletions TrafficReplayer/TrafficReplayer.iml
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,18 @@
<orderEntry type="library" name="Maven: org.apiguardian:apiguardian-api:1.1.2" level="project" />
<orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
<orderEntry type="library" name="Maven: org.json:json:20220924" level="project" />
<orderEntry type="library" name="json" level="project" />
<orderEntry type="library" name="beust.jcommander" level="project" />
<orderEntry type="module-library">
<library>
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/projectlombok/lombok/1.18.22/lombok-1.18.22.jar!/" />
</CLASSES>
<JAVADOC />
<SOURCES />
</library>
</orderEntry>
<orderEntry type="library" name="apache.logging.log4j.core" level="project" />
<orderEntry type="library" name="apache.logging.log4j.api" level="project" />
</component>
</module>
17 changes: 17 additions & 0 deletions TrafficReplayer/src/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
status = error


appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n

appender.file.type = File
appender.file.name = LOGFILE
appender.file.fileName=${filename}/propertieslogs.log
appender.file.layout.type=PatternLayout
appender.file.layout.pattern=[%-5level] %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} - %msg%n

rootLogger.level = warn
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.opensearch.migrations.replay;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class CloseableStringStreamWrapper implements Closeable {
private final Closeable underlyingCloseableResource;
private final Stream<String> underlyingStream;

public CloseableStringStreamWrapper(Closeable underlyingCloseableResource, Stream<String> underlyingStream) {
this.underlyingCloseableResource = underlyingCloseableResource;
this.underlyingStream = underlyingStream;
}

static CloseableStringStreamWrapper generateStreamFromBufferedReader(BufferedReader br) {
return new CloseableStringStreamWrapper(br, Stream.generate((Supplier) () -> {
try {
return br.readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).takeWhile(s -> s != null));
}

@Override
public void close() throws IOException {
underlyingCloseableResource.close();
}

public Stream<String> stream() {
return underlyingStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.log4j.Log4j2;
import org.opensearch.migrations.replay.netty.BacksideHttpWatcherHandler;
import org.opensearch.migrations.replay.netty.BacksideSnifferHandler;

Expand All @@ -20,7 +21,7 @@
import java.time.Instant;
import java.util.function.Consumer;


@Log4j2
public class NettyPacketToHttpHandler implements IPacketToHttpHandler {

ChannelFuture outboundChannelFuture;
Expand All @@ -35,50 +36,47 @@ public class NettyPacketToHttpHandler implements IPacketToHttpHandler {
.channel(NioSocketChannel.class)
.handler(new BacksideSnifferHandler(responseBuilder))
.option(ChannelOption.AUTO_READ, false);
System.err.println("Active - setting up backend connection");
log.trace("Active - setting up backend connection");
outboundChannelFuture = b.connect(serverUri.getHost(), serverUri.getPort());
//outboundChannel = outboundChannelFuture.channel();
responseWatchHandler = new BacksideHttpWatcherHandler(responseBuilder);

outboundChannelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// connection complete start to read first data
System.err.println("Done setting up backend channel & it was successful");
log.trace("Done setting up backend channel & it was successful");
var pipeline = future.channel().pipeline();
pipeline.addFirst(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new HttpResponseDecoder());
pipeline.addLast(new LoggingHandler(LogLevel.WARN));
// TODO - switch this out to use less memory.
// We only need to know when the response has been fully received, not the contents
// since we're already logging those in the sniffer earlier in the pipeline.
pipeline.addLast(new HttpObjectAggregator(1024*1024));
pipeline.addLast(responseWatchHandler);
pipeline.addLast(new LoggingHandler(LogLevel.ERROR));
} else {
// Close the connection if the connection attempt has failed.
System.err.println("closing outbound channel because CONNECT future was not successful");
log.error("closing outbound channel because CONNECT future was not successful");
}
});
}

@Override
public void consumeBytes(byte[] packetData) throws InvalidHttpStateException {
System.err.println("Writing packetData="+packetData);
log.trace("Writing packetData["+packetData+"]");
var packet = Unpooled.wrappedBuffer(packetData);
if (outboundChannelFuture.isDone()) {
Channel channel = outboundChannelFuture.channel();
if (!channel.isActive()) {
System.err.println("Channel is not active - future packets for this connection will be dropped.");
System.err.println("Need to do more sophisticated tracking of progress and retry further up the stack");
log.warn("Channel is not active - future packets for this connection will be dropped.");
log.warn("Need to do more sophisticated tracking of progress and retry further up the stack");
return;
}
System.err.println("Writing data to backside handler");
log.trace("Writing data to backside handler");
channel.writeAndFlush(packet)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.err.println("packet write was successful: "+packetData);
log.trace("packet write was successful: "+packetData);
} else {
System.err.println("closing outbound channel because WRITE future was not successful "+future.cause());
log.warn("closing outbound channel because WRITE future was not successful "+future.cause());
future.channel().close(); // close the backside
}
});
Expand Down
Loading

0 comments on commit f792138

Please sign in to comment.