Skip to content

Commit 18acce9

Browse files
committed
feat(tools/perf): log cpu and memory usages (#2607)
* feat: introduce `CpuMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat: introduce `MemoryMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat(tools/perf): log cpu and memory usages Signed-off-by: Ning Yu <ningyu@automq.com> --------- Signed-off-by: Ning Yu <ningyu@automq.com>
1 parent 68361d8 commit 18acce9

File tree

5 files changed

+86
-6
lines changed

5 files changed

+86
-6
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,6 +2293,7 @@ project(':tools') {
22932293
exclude group: 'org.apache.kafka', module: 'kafka-clients'
22942294
}
22952295
implementation libs.bucket4j
2296+
implementation libs.oshi
22962297
// AutoMQ inject end
22972298

22982299
// for SASL/OAUTHBEARER JWT validation

gradle/dependencies.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ versions += [
182182
confluentSchema: "7.8.0",
183183
iceberg: "1.6.1",
184184
wire: "4.9.1",
185+
oshi: "6.8.1",
185186
// AutoMQ inject end
186187

187188
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
@@ -315,4 +316,5 @@ libs += [
315316
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
316317
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
317318
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
319+
oshi: "com.github.oshi:oshi-core:$versions.oshi",
318320
]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import oshi.SystemInfo;
4+
import oshi.hardware.CentralProcessor;
5+
6+
public class CpuMonitor {
7+
private final CentralProcessor processor;
8+
private long[] prevTicks;
9+
10+
public CpuMonitor() {
11+
this.processor = new SystemInfo().getHardware().getProcessor();
12+
this.prevTicks = processor.getSystemCpuLoadTicks();
13+
}
14+
15+
/**
16+
* Returns the CPU usage between the last call of this method and now.
17+
* It returns -1.0 if an error occurs.
18+
*
19+
* @return CPU load between 0 and 1 (100%)
20+
*/
21+
public synchronized double usage() {
22+
try {
23+
return usage0();
24+
} catch (Exception e) {
25+
return -1.0;
26+
}
27+
}
28+
29+
private double usage0() {
30+
long[] currTicks = processor.getSystemCpuLoadTicks();
31+
double usage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
32+
prevTicks = currTicks;
33+
return usage;
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import java.lang.management.BufferPoolMXBean;
4+
import java.lang.management.ManagementFactory;
5+
import java.util.List;
6+
7+
public class MemoryMonitor {
8+
9+
/**
10+
* Returns the amount of heap memory used by the JVM in bytes.
11+
*/
12+
public static long heapUsed() {
13+
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
14+
}
15+
16+
/**
17+
* Returns the amount of direct memory used by the JVM in bytes.
18+
*/
19+
public static long directUsed() {
20+
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
21+
return pools.stream()
22+
.filter(p -> "direct".equals(p.getName()))
23+
.mapToLong(BufferPoolMXBean::getMemoryUsed)
24+
.sum();
25+
}
26+
}

tools/src/main/java/org/apache/kafka/tools/automq/perf/StatsCollector.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,23 @@ public class StatsCollector {
4343
private static final double BYTES_PER_MB = 1 << 20;
4444
private static final double BYTES_PER_GB = 1 << 30;
4545

46-
// max to 9999.9 (167 min)
46+
// max to 9999.9 s (167 min)
4747
private static final DecimalFormat DURATION_FORMAT = new PaddingDecimalFormat("0.0", 6);
48-
// max to 999999.99 (1M msg/s)
48+
// max to 99.99%
49+
private static final DecimalFormat PERCENT_FORMAT = new PaddingDecimalFormat("0.00", 5);
50+
// max to 99999 MiB (100 GiB)
51+
private static final DecimalFormat MEMORY_FORMAT = new PaddingDecimalFormat("0", 5);
52+
// max to 999999.99 msg/s (1M msg/s)
4953
private static final DecimalFormat RATE_FORMAT = new PaddingDecimalFormat("0.00", 9);
50-
// max to 999.99 (1 GB/s)
54+
// max to 999.99 MiB/s (1 GiB/s)
5155
private static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.00", 6);
52-
// max to 99999.999 (100 s)
56+
// max to 99999.999 ms (100 s)
5357
private static final DecimalFormat LATENCY_FORMAT = new PaddingDecimalFormat("0.000", 9);
58+
// max to 999.99
5459
private static final DecimalFormat COUNT_FORMAT = new PaddingDecimalFormat("0.00", 6);
5560

5661
private static final String PERIOD_LOG_FORMAT = "{}s" +
62+
" | CPU {}% | Mem {} MiB heap / {} MiB direct" +
5763
" | Prod rate {} msg/s / {} MiB/s | Prod err {} err/s" +
5864
" | Cons rate {} msg/s / {} MiB/s | Backlog: {} K msg" +
5965
" | Prod Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}" +
@@ -69,6 +75,7 @@ public class StatsCollector {
6975
public static Result printAndCollectStats(Stats stats, StopCondition condition, long intervalNanos,
7076
PerfConfig config) {
7177
final long start = System.nanoTime();
78+
CpuMonitor cpu = new CpuMonitor();
7279
Result result = new Result(config);
7380

7481
long last = start;
@@ -83,7 +90,7 @@ public static Result printAndCollectStats(Stats stats, StopCondition condition,
8390
double elapsed = (periodStats.nowNanos - last) / NANOS_PER_SEC;
8491
double elapsedTotal = (periodStats.nowNanos - start) / NANOS_PER_SEC;
8592

86-
PeriodResult periodResult = new PeriodResult(periodStats, elapsed, config.groupsPerTopic);
93+
PeriodResult periodResult = new PeriodResult(cpu, periodStats, elapsed, config.groupsPerTopic);
8794
result.update(periodResult, elapsedTotal);
8895
periodResult.logIt(elapsedTotal);
8996

@@ -245,6 +252,9 @@ private void update(CumulativeResult cumulativeResult) {
245252
}
246253

247254
private static class PeriodResult {
255+
private final double cpuUsage;
256+
private final long heapMemoryUsed;
257+
private final long directMemoryUsed;
248258
private final double produceRate;
249259
private final double produceThroughputBps;
250260
private final double errorRate;
@@ -270,7 +280,10 @@ private static class PeriodResult {
270280
private final double endToEndLatency9999thMicros;
271281
private final double endToEndLatencyMaxMicros;
272282

273-
private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
283+
private PeriodResult(CpuMonitor cpu, PeriodStats stats, double elapsed, int readWriteRatio) {
284+
this.cpuUsage = cpu.usage();
285+
this.heapMemoryUsed = MemoryMonitor.heapUsed();
286+
this.directMemoryUsed = MemoryMonitor.directUsed();
274287
this.produceRate = stats.messagesSent / elapsed;
275288
this.produceThroughputBps = stats.bytesSent / elapsed;
276289
this.errorRate = stats.messagesSendFailed / elapsed;
@@ -300,6 +313,9 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
300313
private void logIt(double elapsedTotal) {
301314
LOGGER.info(PERIOD_LOG_FORMAT,
302315
DURATION_FORMAT.format(elapsedTotal),
316+
PERCENT_FORMAT.format(cpuUsage * 100),
317+
MEMORY_FORMAT.format(heapMemoryUsed / BYTES_PER_MB),
318+
MEMORY_FORMAT.format(directMemoryUsed / BYTES_PER_MB),
303319
RATE_FORMAT.format(produceRate),
304320
THROUGHPUT_FORMAT.format(produceThroughputBps / BYTES_PER_MB),
305321
RATE_FORMAT.format(errorRate),

0 commit comments

Comments
 (0)