Skip to content

Commit

Permalink
Merge #2954 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 2, 2023
2 parents fbd8aca + 8dfc7e1 commit 68f3048
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package reactor.netty5.http.server;

import io.netty5.buffer.Buffer;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelHandlerAdapter;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.handler.codec.http.HttpContent;
Expand All @@ -30,6 +31,7 @@
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

import java.net.SocketAddress;
import java.time.Duration;
import java.util.function.Function;

Expand Down Expand Up @@ -75,6 +77,7 @@ public void channelActive(ChannelHandlerContext ctx) {
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -91,6 +94,7 @@ public void channelActive(ChannelHandlerContext ctx) {
public void channelInactive(ChannelHandlerContext ctx) {
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -100,10 +104,9 @@ public void channelInactive(ChannelHandlerContext ctx) {
// Allow request-response exchange to continue, unaffected by metrics problem
}
}
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations ops) {
recordInactiveConnectionOrStream(ops);
}

recordInactiveConnectionOrStream(ctx.channel());

ctx.fireChannelInactive();
}

Expand Down Expand Up @@ -142,9 +145,10 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
recordInactiveConnectionOrStream(ops);
}

recordInactiveConnectionOrStream(ctx.channel());

dataSent = 0;
});
}
Expand All @@ -164,15 +168,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
ChannelOperations<?, ?> channelOps = ChannelOperations.get(ctx.channel());
if (channelOps instanceof HttpServerOperations ops) {
channelActivated = true;
if (ops.isHttp2()) {
recordOpenStream(ops);
}
else {
recordActiveConnection(ops);
}
startRead(ops, uriTagValue == null ? ops.path : uriTagValue.apply(ops.path), ops.method().name());
}

channelActivated = true;
if (ctx.channel() instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordOpenStream(ctx.channel().localAddress());
}
else {
// Always use the real connection local address without any proxy information
recordActiveConnection(ctx.channel().localAddress());
}
}

dataReceived += extractProcessedDataFromBuffer(msg);
Expand Down Expand Up @@ -257,20 +264,20 @@ protected void recordWrite(HttpServerOperations ops, String path, String method,
recorder().recordDataSent(ops.remoteSocketAddress(), path, dataSent);
}

protected void recordActiveConnection(HttpServerOperations ops) {
recorder().recordServerConnectionActive(ops.hostSocketAddress());
protected void recordActiveConnection(SocketAddress localAddress) {
recorder().recordServerConnectionActive(localAddress);
}

protected void recordInactiveConnection(HttpServerOperations ops) {
recorder().recordServerConnectionInactive(ops.hostSocketAddress());
protected void recordInactiveConnection(SocketAddress localAddress) {
recorder().recordServerConnectionInactive(localAddress);
}

protected void recordOpenStream(HttpServerOperations ops) {
recorder().recordStreamOpened(ops.hostSocketAddress());
protected void recordOpenStream(SocketAddress localAddress) {
recorder().recordStreamOpened(localAddress);
}

protected void recordClosedStream(HttpServerOperations ops) {
recorder().recordStreamClosed(ops.hostSocketAddress());
protected void recordClosedStream(SocketAddress localAddress) {
recorder().recordStreamClosed(localAddress);
}

protected void startRead(HttpServerOperations ops, String path, String method) {
Expand All @@ -281,20 +288,22 @@ protected void startWrite(HttpServerOperations ops, String path, String method,
dataSentTime = System.nanoTime();
}

void recordInactiveConnectionOrStream(HttpServerOperations ops) {
void recordInactiveConnectionOrStream(Channel channel) {
if (channelActivated) {
channelActivated = false;
try {
if (ops.isHttp2()) {
recordClosedStream(ops);
if (channel instanceof Http2StreamChannel) {
// Always use the real connection local address without any proxy information
recordClosedStream(channel.localAddress());
}
else {
recordInactiveConnection(ops);
// Always use the real connection local address without any proxy information
recordInactiveConnection(channel.localAddress());
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
log.warn(format(ops.channel(), "Exception caught while recording metrics."), e);
log.warn(format(channel, "Exception caught while recording metrics."), e);
}
// Allow request-response exchange to continue, unaffected by metrics problem
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco
ServerCloseHandler.INSTANCE.register(cnx.channel());
});

disposableServer = server.bindNow();
disposableServer = server.forwarded(true).bindNow();

AtomicReference<SocketAddress> clientAddress = new AtomicReference<>();
httpClient = httpClient
Expand All @@ -662,6 +662,7 @@ void testServerConnectionsMicrometer(HttpProtocol[] serverProtocols, HttpProtoco

customizeClientOptions(httpClient, clientCtx, clientProtocols)
.metrics(true, Function.identity())
.headers(h -> h.add("X-Forwarded-Host", "192.168.0.1"))
.post()
.uri(uri)
.send(body)
Expand Down Expand Up @@ -1001,7 +1002,7 @@ public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
}

private void checkServerConnectionsMicrometer(HttpServerRequest request) {
String address = formatSocketAddress(request.hostAddress());
String address = formatSocketAddress(request.connectionHostAddress());
boolean isHttp2 = request.requestHeaders().contains(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text());
checkGauge(SERVER_CONNECTIONS_TOTAL, true, 1, URI, HTTP, LOCAL_ADDRESS, address);
if (isHttp2) {
Expand Down

0 comments on commit 68f3048

Please sign in to comment.