Skip to content

Commit

Permalink
Merge #2953 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 2, 2023
2 parents 8ea3f16 + 45201ec commit fbd8aca
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,8 +62,6 @@ public class MicrometerChannelMetricsRecorder implements ChannelMetricsRecorder

final ConcurrentMap<String, LongAdder> totalConnectionsCache = new ConcurrentHashMap<>();

final LongAdder totalConnectionsAdder = new LongAdder();

final String name;
final String protocol;

Expand Down Expand Up @@ -202,10 +200,11 @@ public String protocol() {
}

@Nullable
private LongAdder getTotalConnectionsAdder(SocketAddress serverAddress) {
LongAdder getTotalConnectionsAdder(SocketAddress serverAddress) {
String address = reactor.netty5.Metrics.formatSocketAddress(serverAddress);
return MapUtils.computeIfAbsent(totalConnectionsCache, address,
key -> {
LongAdder totalConnectionsAdder = new LongAdder();
Gauge gauge = filter(Gauge.builder(name + CONNECTIONS_TOTAL, totalConnectionsAdder, LongAdder::longValue)
.tags(ChannelMeters.ConnectionsTotalMeterTags.URI.asString(), protocol,
ChannelMeters.ConnectionsTotalMeterTags.LOCAL_ADDRESS.asString(), address)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5.channel;

import org.junit.jupiter.api.Test;
import reactor.netty5.transport.AddressUtils;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.LongAdder;

import static org.assertj.core.api.Assertions.assertThat;

class MicrometerChannelMetricsRecorderTests {
static final InetSocketAddress ADDRESS_1 = AddressUtils.createUnresolved("127.0.0.1", 80);
static final InetSocketAddress ADDRESS_2 = AddressUtils.createUnresolved("0:0:0:0:0:0:0:1", 80);

@Test
void testGetTotalConnectionsAdder() {
MicrometerChannelMetricsRecorder recorder = new MicrometerChannelMetricsRecorder("test", "test");

LongAdder longAdder1 = recorder.getTotalConnectionsAdder(ADDRESS_1);

LongAdder longAdder2 = recorder.getTotalConnectionsAdder(ADDRESS_2);

assertThat(longAdder1).isNotSameAs(longAdder2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ final class MicrometerHttpServerMetricsRecorder extends MicrometerHttpMetricsRec

static final MicrometerHttpServerMetricsRecorder INSTANCE = new MicrometerHttpServerMetricsRecorder();
private static final String PROTOCOL_VALUE_HTTP = "http";
private final LongAdder activeConnectionsAdder = new LongAdder();
private final LongAdder activeStreamsAdder = new LongAdder();
private final ConcurrentMap<String, LongAdder> activeConnectionsCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, LongAdder> activeStreamsCache = new ConcurrentHashMap<>();
private final ConcurrentMap<String, DistributionSummary> dataReceivedCache = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -206,10 +204,11 @@ public void recordResolveAddressTime(SocketAddress remoteAddress, Duration time,
}

@Nullable
private LongAdder getActiveStreamsAdder(SocketAddress localAddress) {
LongAdder getActiveStreamsAdder(SocketAddress localAddress) {
String address = reactor.netty5.Metrics.formatSocketAddress(localAddress);
return MapUtils.computeIfAbsent(activeStreamsCache, address,
key -> {
LongAdder activeStreamsAdder = new LongAdder();
Gauge gauge = filter(
Gauge.builder(STREAMS_ACTIVE.getName(), activeStreamsAdder, LongAdder::longValue)
.tags(HttpServerMeters.StreamsActiveTags.URI.asString(), PROTOCOL_VALUE_HTTP,
Expand All @@ -220,10 +219,11 @@ private LongAdder getActiveStreamsAdder(SocketAddress localAddress) {
}

@Nullable
private LongAdder getServerConnectionAdder(SocketAddress localAddress) {
LongAdder getServerConnectionAdder(SocketAddress localAddress) {
String address = reactor.netty5.Metrics.formatSocketAddress(localAddress);
return MapUtils.computeIfAbsent(activeConnectionsCache, address,
key -> {
LongAdder activeConnectionsAdder = new LongAdder();
Gauge gauge = filter(
Gauge.builder(CONNECTIONS_ACTIVE.getName(), activeConnectionsAdder, LongAdder::longValue)
.tags(HttpServerMeters.ConnectionsActiveTags.URI.asString(), PROTOCOL_VALUE_HTTP,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.netty5.http.server;

import org.junit.jupiter.api.Test;
import reactor.netty5.transport.AddressUtils;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.LongAdder;

import static org.assertj.core.api.Assertions.assertThat;

class MicrometerHttpServerMetricsRecorderTests {
static final InetSocketAddress ADDRESS_1 = AddressUtils.createUnresolved("127.0.0.1", 80);
static final InetSocketAddress ADDRESS_2 = AddressUtils.createUnresolved("0:0:0:0:0:0:0:1", 80);

@Test
void testGetServerConnectionAdder() {
LongAdder longAdder1 = MicrometerHttpServerMetricsRecorder.INSTANCE.getServerConnectionAdder(ADDRESS_1);

LongAdder longAdder2 = MicrometerHttpServerMetricsRecorder.INSTANCE.getServerConnectionAdder(ADDRESS_2);

assertThat(longAdder1).isNotSameAs(longAdder2);
}

@Test
void testGetActiveStreamsAdder() {
LongAdder longAdder1 = MicrometerHttpServerMetricsRecorder.INSTANCE.getActiveStreamsAdder(ADDRESS_1);

LongAdder longAdder2 = MicrometerHttpServerMetricsRecorder.INSTANCE.getActiveStreamsAdder(ADDRESS_2);

assertThat(longAdder1).isNotSameAs(longAdder2);
}
}

0 comments on commit fbd8aca

Please sign in to comment.