Skip to content

feat(tools/perf): log cpu and memory usages #2607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,7 @@ project(':tools') {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
implementation libs.bucket4j
implementation libs.oshi
// AutoMQ inject end

implementation project(':storage')
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ versions += [
confluentSchema: "7.8.0",
iceberg: "1.6.1",
wire: "4.9.1",
oshi: "6.8.1",
// AutoMQ inject end

junitPlatform: "1.10.2"
Expand Down Expand Up @@ -305,4 +306,5 @@ libs += [
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
oshi: "com.github.oshi:oshi-core:$versions.oshi",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.kafka.tools.automq.perf;

import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;

public class CpuMonitor {
private final CentralProcessor processor;
private long[] prevTicks;

public CpuMonitor() {
this.processor = new SystemInfo().getHardware().getProcessor();
this.prevTicks = processor.getSystemCpuLoadTicks();
}

/**
* Returns the CPU usage between the last call of this method and now.
* It returns -1.0 if an error occurs.
*
* @return CPU load between 0 and 1 (100%)
*/
public synchronized double usage() {
try {
return usage0();
} catch (Exception e) {
return -1.0;
}
}

private double usage0() {
long[] currTicks = processor.getSystemCpuLoadTicks();
double usage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
prevTicks = currTicks;
return usage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.kafka.tools.automq.perf;

import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;

public class MemoryMonitor {

/**
* Returns the amount of heap memory used by the JVM in bytes.
*/
public static long heapUsed() {
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}

/**
* Returns the amount of direct memory used by the JVM in bytes.
*/
public static long directUsed() {
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
return pools.stream()
.filter(p -> "direct".equals(p.getName()))
.mapToLong(BufferPoolMXBean::getMemoryUsed)
.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,23 @@ public class StatsCollector {
private static final double BYTES_PER_MB = 1 << 20;
private static final double BYTES_PER_GB = 1 << 30;

// max to 9999.9 (167 min)
// max to 9999.9 s (167 min)
private static final DecimalFormat DURATION_FORMAT = new PaddingDecimalFormat("0.0", 6);
// max to 999999.99 (1M msg/s)
// max to 99.99%
private static final DecimalFormat PERCENT_FORMAT = new PaddingDecimalFormat("0.00", 5);
// max to 99999 MiB (100 GiB)
private static final DecimalFormat MEMORY_FORMAT = new PaddingDecimalFormat("0", 5);
// max to 999999.99 msg/s (1M msg/s)
private static final DecimalFormat RATE_FORMAT = new PaddingDecimalFormat("0.00", 9);
// max to 999.99 (1 GB/s)
// max to 999.99 MiB/s (1 GiB/s)
private static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.00", 6);
// max to 99999.999 (100 s)
// max to 99999.999 ms (100 s)
private static final DecimalFormat LATENCY_FORMAT = new PaddingDecimalFormat("0.000", 9);
// max to 999.99
private static final DecimalFormat COUNT_FORMAT = new PaddingDecimalFormat("0.00", 6);

private static final String PERIOD_LOG_FORMAT = "{}s" +
" | CPU {}% | Mem {} MiB heap / {} MiB direct" +
" | Prod rate {} msg/s / {} MiB/s | Prod err {} err/s" +
" | Cons rate {} msg/s / {} MiB/s | Backlog: {} K msg" +
" | Prod Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}" +
Expand All @@ -69,6 +75,7 @@ public class StatsCollector {
public static Result printAndCollectStats(Stats stats, StopCondition condition, long intervalNanos,
PerfConfig config) {
final long start = System.nanoTime();
CpuMonitor cpu = new CpuMonitor();
Result result = new Result(config);

long last = start;
Expand All @@ -83,7 +90,7 @@ public static Result printAndCollectStats(Stats stats, StopCondition condition,
double elapsed = (periodStats.nowNanos - last) / NANOS_PER_SEC;
double elapsedTotal = (periodStats.nowNanos - start) / NANOS_PER_SEC;

PeriodResult periodResult = new PeriodResult(periodStats, elapsed, config.groupsPerTopic);
PeriodResult periodResult = new PeriodResult(cpu, periodStats, elapsed, config.groupsPerTopic);
result.update(periodResult, elapsedTotal);
periodResult.logIt(elapsedTotal);

Expand Down Expand Up @@ -245,6 +252,9 @@ private void update(CumulativeResult cumulativeResult) {
}

private static class PeriodResult {
private final double cpuUsage;
private final long heapMemoryUsed;
private final long directMemoryUsed;
private final double produceRate;
private final double produceThroughputBps;
private final double errorRate;
Expand All @@ -270,7 +280,10 @@ private static class PeriodResult {
private final double endToEndLatency9999thMicros;
private final double endToEndLatencyMaxMicros;

private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
private PeriodResult(CpuMonitor cpu, PeriodStats stats, double elapsed, int readWriteRatio) {
this.cpuUsage = cpu.usage();
this.heapMemoryUsed = MemoryMonitor.heapUsed();
this.directMemoryUsed = MemoryMonitor.directUsed();
this.produceRate = stats.messagesSent / elapsed;
this.produceThroughputBps = stats.bytesSent / elapsed;
this.errorRate = stats.messagesSendFailed / elapsed;
Expand Down Expand Up @@ -300,6 +313,9 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
private void logIt(double elapsedTotal) {
LOGGER.info(PERIOD_LOG_FORMAT,
DURATION_FORMAT.format(elapsedTotal),
PERCENT_FORMAT.format(cpuUsage * 100),
MEMORY_FORMAT.format(heapMemoryUsed / BYTES_PER_MB),
MEMORY_FORMAT.format(directMemoryUsed / BYTES_PER_MB),
RATE_FORMAT.format(produceRate),
THROUGHPUT_FORMAT.format(produceThroughputBps / BYTES_PER_MB),
RATE_FORMAT.format(errorRate),
Expand Down
Loading