Skip to content

Commit 13f849d

Browse files
committed
feat(tools/perf): log cpu and memory usages (#2607)
* feat: introduce `CpuMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat: introduce `MemoryMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat(tools/perf): log cpu and memory usages Signed-off-by: Ning Yu <ningyu@automq.com> --------- Signed-off-by: Ning Yu <ningyu@automq.com>
1 parent 19996bc commit 13f849d

File tree

5 files changed

+86
-6
lines changed

5 files changed

+86
-6
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,7 @@ project(':tools') {
22532253
exclude group: 'org.apache.kafka', module: 'kafka-clients'
22542254
}
22552255
implementation libs.bucket4j
2256+
implementation libs.oshi
22562257
// AutoMQ inject end
22572258

22582259
// for SASL/OAUTHBEARER JWT validation

gradle/dependencies.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ versions += [
179179
hdrHistogram:"2.1.12",
180180
nettyTcnativeBoringSsl: "2.0.65.Final",
181181
confluentSchema: "7.4.0",
182+
oshi: "6.8.1",
182183
// AutoMQ inject end
183184

184185
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
@@ -312,4 +313,5 @@ libs += [
312313
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
313314
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
314315
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
316+
oshi: "com.github.oshi:oshi-core:$versions.oshi",
315317
]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import oshi.SystemInfo;
4+
import oshi.hardware.CentralProcessor;
5+
6+
public class CpuMonitor {
7+
private final CentralProcessor processor;
8+
private long[] prevTicks;
9+
10+
public CpuMonitor() {
11+
this.processor = new SystemInfo().getHardware().getProcessor();
12+
this.prevTicks = processor.getSystemCpuLoadTicks();
13+
}
14+
15+
/**
16+
* Returns the CPU usage between the last call of this method and now.
17+
* It returns -1.0 if an error occurs.
18+
*
19+
* @return CPU load between 0 and 1 (100%)
20+
*/
21+
public synchronized double usage() {
22+
try {
23+
return usage0();
24+
} catch (Exception e) {
25+
return -1.0;
26+
}
27+
}
28+
29+
private double usage0() {
30+
long[] currTicks = processor.getSystemCpuLoadTicks();
31+
double usage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
32+
prevTicks = currTicks;
33+
return usage;
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import java.lang.management.BufferPoolMXBean;
4+
import java.lang.management.ManagementFactory;
5+
import java.util.List;
6+
7+
public class MemoryMonitor {
8+
9+
/**
10+
* Returns the amount of heap memory used by the JVM in bytes.
11+
*/
12+
public static long heapUsed() {
13+
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
14+
}
15+
16+
/**
17+
* Returns the amount of direct memory used by the JVM in bytes.
18+
*/
19+
public static long directUsed() {
20+
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
21+
return pools.stream()
22+
.filter(p -> "direct".equals(p.getName()))
23+
.mapToLong(BufferPoolMXBean::getMemoryUsed)
24+
.sum();
25+
}
26+
}

tools/src/main/java/org/apache/kafka/tools/automq/perf/StatsCollector.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,23 @@ public class StatsCollector {
4242
private static final double BYTES_PER_MB = 1 << 20;
4343
private static final double BYTES_PER_GB = 1 << 30;
4444

45-
// max to 9999.9 (167 min)
45+
// max to 9999.9 s (167 min)
4646
private static final DecimalFormat DURATION_FORMAT = new PaddingDecimalFormat("0.0", 6);
47-
// max to 999999.99 (1M msg/s)
47+
// max to 99.99%
48+
private static final DecimalFormat PERCENT_FORMAT = new PaddingDecimalFormat("0.00", 5);
49+
// max to 99999 MiB (100 GiB)
50+
private static final DecimalFormat MEMORY_FORMAT = new PaddingDecimalFormat("0", 5);
51+
// max to 999999.99 msg/s (1M msg/s)
4852
private static final DecimalFormat RATE_FORMAT = new PaddingDecimalFormat("0.00", 9);
49-
// max to 999.99 (1 GB/s)
53+
// max to 999.99 MiB/s (1 GiB/s)
5054
private static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.00", 6);
51-
// max to 99999.999 (100 s)
55+
// max to 99999.999 ms (100 s)
5256
private static final DecimalFormat LATENCY_FORMAT = new PaddingDecimalFormat("0.000", 9);
57+
// max to 999.99
5358
private static final DecimalFormat COUNT_FORMAT = new PaddingDecimalFormat("0.00", 6);
5459

5560
private static final String PERIOD_LOG_FORMAT = "{}s" +
61+
" | CPU {}% | Mem {} MiB heap / {} MiB direct" +
5662
" | Prod rate {} msg/s / {} MiB/s | Prod err {} err/s" +
5763
" | Cons rate {} msg/s / {} MiB/s | Backlog: {} K msg" +
5864
" | Prod Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}" +
@@ -68,6 +74,7 @@ public class StatsCollector {
6874
public static Result printAndCollectStats(Stats stats, StopCondition condition, long intervalNanos,
6975
PerfConfig config) {
7076
final long start = System.nanoTime();
77+
CpuMonitor cpu = new CpuMonitor();
7178
Result result = new Result(config);
7279

7380
long last = start;
@@ -82,7 +89,7 @@ public static Result printAndCollectStats(Stats stats, StopCondition condition,
8289
double elapsed = (periodStats.nowNanos - last) / NANOS_PER_SEC;
8390
double elapsedTotal = (periodStats.nowNanos - start) / NANOS_PER_SEC;
8491

85-
PeriodResult periodResult = new PeriodResult(periodStats, elapsed, config.groupsPerTopic);
92+
PeriodResult periodResult = new PeriodResult(cpu, periodStats, elapsed, config.groupsPerTopic);
8693
result.update(periodResult, elapsedTotal);
8794
periodResult.logIt(elapsedTotal);
8895

@@ -239,6 +246,9 @@ private void update(CumulativeResult cumulativeResult) {
239246
}
240247

241248
private static class PeriodResult {
249+
private final double cpuUsage;
250+
private final long heapMemoryUsed;
251+
private final long directMemoryUsed;
242252
private final double produceRate;
243253
private final double produceThroughputBps;
244254
private final double errorRate;
@@ -264,7 +274,10 @@ private static class PeriodResult {
264274
private final double endToEndLatency9999thMicros;
265275
private final double endToEndLatencyMaxMicros;
266276

267-
private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
277+
private PeriodResult(CpuMonitor cpu, PeriodStats stats, double elapsed, int readWriteRatio) {
278+
this.cpuUsage = cpu.usage();
279+
this.heapMemoryUsed = MemoryMonitor.heapUsed();
280+
this.directMemoryUsed = MemoryMonitor.directUsed();
268281
this.produceRate = stats.messagesSent / elapsed;
269282
this.produceThroughputBps = stats.bytesSent / elapsed;
270283
this.errorRate = stats.messagesSendFailed / elapsed;
@@ -294,6 +307,9 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
294307
private void logIt(double elapsedTotal) {
295308
LOGGER.info(PERIOD_LOG_FORMAT,
296309
DURATION_FORMAT.format(elapsedTotal),
310+
PERCENT_FORMAT.format(cpuUsage * 100),
311+
MEMORY_FORMAT.format(heapMemoryUsed / BYTES_PER_MB),
312+
MEMORY_FORMAT.format(directMemoryUsed / BYTES_PER_MB),
297313
RATE_FORMAT.format(produceRate),
298314
THROUGHPUT_FORMAT.format(produceThroughputBps / BYTES_PER_MB),
299315
RATE_FORMAT.format(errorRate),

0 commit comments

Comments
 (0)