Skip to content

feat(tools/perf): log cpu, memory usage and min latency #2610

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,7 @@ project(':tools') {
exclude group: 'org.apache.kafka', module: 'kafka-clients'
}
implementation libs.bucket4j
implementation libs.oshi
// AutoMQ inject end

// for SASL/OAUTHBEARER JWT validation
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ versions += [
hdrHistogram:"2.1.12",
nettyTcnativeBoringSsl: "2.0.65.Final",
confluentSchema: "7.4.0",
oshi: "6.8.1",
// AutoMQ inject end

// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
Expand Down Expand Up @@ -312,4 +313,5 @@ libs += [
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
oshi: "com.github.oshi:oshi-core:$versions.oshi",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.kafka.tools.automq.perf;

import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;

public class CpuMonitor {
private final CentralProcessor processor;
private long[] prevTicks;

public CpuMonitor() {
this.processor = new SystemInfo().getHardware().getProcessor();
this.prevTicks = processor.getSystemCpuLoadTicks();
}

/**
* Returns the CPU usage between the last call of this method and now.
* It returns -1.0 if an error occurs.
*
* @return CPU load between 0 and 1 (100%)
*/
public synchronized double usage() {
try {
return usage0();
} catch (Exception e) {
return -1.0;
}
}

private double usage0() {
long[] currTicks = processor.getSystemCpuLoadTicks();
double usage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
prevTicks = currTicks;
return usage;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.kafka.tools.automq.perf;

import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
import java.util.List;

public class MemoryMonitor {

/**
* Returns the amount of heap memory used by the JVM in bytes.
*/
public static long heapUsed() {
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
}

/**
* Returns the amount of direct memory used by the JVM in bytes.
*/
public static long directUsed() {
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
return pools.stream()
.filter(p -> "direct".equals(p.getName()))
.mapToLong(BufferPoolMXBean::getMemoryUsed)
.sum();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -95,7 +96,9 @@ public PerfConfig(String[] args) {
recordSize = ns.getInt("recordSize");
randomRatio = ns.getDouble("randomRatio");
randomPoolSize = ns.getInt("randomPoolSize");
sendRate = ns.getInt("sendRate");
sendRate = Optional.ofNullable(ns.getDouble("sendThroughput"))
.map(throughput -> (int) (throughput * 1024 * 1024 / recordSize))
.orElse(ns.getInt("sendRate"));
sendRateDuringCatchup = ns.getInt("sendRateDuringCatchup") == null ? sendRate : ns.getInt("sendRateDuringCatchup");
maxConsumeRecordRate = ns.getInt("maxConsumeRecordRate");
backlogDurationSeconds = ns.getInt("backlogDurationSeconds");
Expand Down Expand Up @@ -209,6 +212,11 @@ public static ArgumentParser parser() {
.dest("sendRate")
.metavar("SEND_RATE")
.help("The send rate in messages per second.");
parser.addArgument("-f", "--send-throughput")
.type(Double.class)
.dest("sendThroughput")
.metavar("SEND_THROUGHPUT")
.help("The send throughput in MB/s. If not set, the send rate will be used. This is an alternative to --send-rate.");
parser.addArgument("-a", "--send-rate-during-catchup")
.type(positiveInteger())
.dest("sendRateDuringCatchup")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,32 +34,39 @@ public class StatsCollector {
private static final double BYTES_PER_MB = 1 << 20;
private static final double BYTES_PER_GB = 1 << 30;

// max to 9999.9 (167 min)
// max to 9999.9 s (167 min)
private static final DecimalFormat DURATION_FORMAT = new PaddingDecimalFormat("0.0", 6);
// max to 999999.99 (1M msg/s)
// max to 99.99%
private static final DecimalFormat PERCENT_FORMAT = new PaddingDecimalFormat("0.00", 5);
// max to 99999 MiB (100 GiB)
private static final DecimalFormat MEMORY_FORMAT = new PaddingDecimalFormat("0", 5);
// max to 999999.99 msg/s (1M msg/s)
private static final DecimalFormat RATE_FORMAT = new PaddingDecimalFormat("0.00", 9);
// max to 999.99 (1 GB/s)
// max to 999.99 MiB/s (1 GiB/s)
private static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.00", 6);
// max to 99999.999 (100 s)
// max to 99999.999 ms (100 s)
private static final DecimalFormat LATENCY_FORMAT = new PaddingDecimalFormat("0.000", 9);
// max to 999.99
private static final DecimalFormat COUNT_FORMAT = new PaddingDecimalFormat("0.00", 6);

private static final String PERIOD_LOG_FORMAT = "{}s" +
" | CPU {}% | Mem {} MiB heap / {} MiB direct" +
" | Prod rate {} msg/s / {} MiB/s | Prod err {} err/s" +
" | Cons rate {} msg/s / {} MiB/s | Backlog: {} K msg" +
" | Prod Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}" +
" | E2E Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}";
" | Prod Latency (ms) avg: {} - min: {} - 50%: {} - 99%: {} - 99.9%: {} - max: {}" +
" | E2E Latency (ms) avg: {} - min: {} - 50%: {} - 99%: {} - 99.9%: {} - max: {}";
private static final String SUMMARY_LOG_FORMAT = "Summary" +
" | Prod rate {} msg/s / {} MiB/s | Prod total {} M msg / {} GiB / {} K err" +
" | Cons rate {} msg/s / {} MiB/s | Cons total {} M msg / {} GiB" +
" | Prod Latency (ms) avg: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}" +
" | E2E Latency (ms) avg: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}";
" | Prod Latency (ms) avg: {} - min: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - max: {}" +
" | E2E Latency (ms) avg: {} - min: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - max: {}";

private static final Logger LOGGER = LoggerFactory.getLogger(StatsCollector.class);

public static Result printAndCollectStats(Stats stats, StopCondition condition, long intervalNanos,
PerfConfig config) {
final long start = System.nanoTime();
CpuMonitor cpu = new CpuMonitor();
Result result = new Result(config);

long last = start;
Expand All @@ -74,7 +81,7 @@ public static Result printAndCollectStats(Stats stats, StopCondition condition,
double elapsed = (periodStats.nowNanos - last) / NANOS_PER_SEC;
double elapsedTotal = (periodStats.nowNanos - start) / NANOS_PER_SEC;

PeriodResult periodResult = new PeriodResult(periodStats, elapsed, config.groupsPerTopic);
PeriodResult periodResult = new PeriodResult(cpu, periodStats, elapsed, config.groupsPerTopic);
result.update(periodResult, elapsedTotal);
periodResult.logIt(elapsedTotal);

Expand Down Expand Up @@ -144,6 +151,7 @@ public static class Result {
public final List<Double> consumeThroughputBps = new ArrayList<>();
public final List<Long> backlog = new ArrayList<>();
public final List<Double> produceLatencyMeanMicros = new ArrayList<>();
public final List<Double> produceLatencyMinMicros = new ArrayList<>();
public final List<Double> produceLatency50thMicros = new ArrayList<>();
public final List<Double> produceLatency75thMicros = new ArrayList<>();
public final List<Double> produceLatency90thMicros = new ArrayList<>();
Expand All @@ -154,6 +162,7 @@ public static class Result {
public final List<Double> produceLatencyMaxMicros = new ArrayList<>();
public Map<Double, Long> produceLatencyQuantilesMicros;
public final List<Double> endToEndLatencyMeanMicros = new ArrayList<>();
public final List<Double> endToEndLatencyMinMicros = new ArrayList<>();
public final List<Double> endToEndLatency50thMicros = new ArrayList<>();
public final List<Double> endToEndLatency75thMicros = new ArrayList<>();
public final List<Double> endToEndLatency90thMicros = new ArrayList<>();
Expand All @@ -178,6 +187,7 @@ private void update(PeriodResult periodResult, double elapsedTotal) {
this.consumeThroughputBps.add(periodResult.consumeThroughputBps);
this.backlog.add(periodResult.backlog);
this.produceLatencyMeanMicros.add(periodResult.produceLatencyMeanMicros);
this.produceLatencyMinMicros.add(periodResult.produceLatencyMinMicros);
this.produceLatency50thMicros.add(periodResult.produceLatency50thMicros);
this.produceLatency75thMicros.add(periodResult.produceLatency75thMicros);
this.produceLatency90thMicros.add(periodResult.produceLatency90thMicros);
Expand All @@ -187,6 +197,7 @@ private void update(PeriodResult periodResult, double elapsedTotal) {
this.produceLatency9999thMicros.add(periodResult.produceLatency9999thMicros);
this.produceLatencyMaxMicros.add(periodResult.produceLatencyMaxMicros);
this.endToEndLatencyMeanMicros.add(periodResult.endToEndLatencyMeanMicros);
this.endToEndLatencyMinMicros.add(periodResult.endToEndLatencyMinMicros);
this.endToEndLatency50thMicros.add(periodResult.endToEndLatency50thMicros);
this.endToEndLatency75thMicros.add(periodResult.endToEndLatency75thMicros);
this.endToEndLatency90thMicros.add(periodResult.endToEndLatency90thMicros);
Expand Down Expand Up @@ -231,13 +242,17 @@ private void update(CumulativeResult cumulativeResult) {
}

private static class PeriodResult {
private final double cpuUsage;
private final long heapMemoryUsed;
private final long directMemoryUsed;
private final double produceRate;
private final double produceThroughputBps;
private final double errorRate;
private final double consumeRate;
private final double consumeThroughputBps;
private final long backlog;
private final double produceLatencyMeanMicros;
private final double produceLatencyMinMicros;
private final double produceLatency50thMicros;
private final double produceLatency75thMicros;
private final double produceLatency90thMicros;
Expand All @@ -247,6 +262,7 @@ private static class PeriodResult {
private final double produceLatency9999thMicros;
private final double produceLatencyMaxMicros;
private final double endToEndLatencyMeanMicros;
private final double endToEndLatencyMinMicros;
private final double endToEndLatency50thMicros;
private final double endToEndLatency75thMicros;
private final double endToEndLatency90thMicros;
Expand All @@ -256,14 +272,18 @@ private static class PeriodResult {
private final double endToEndLatency9999thMicros;
private final double endToEndLatencyMaxMicros;

private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
private PeriodResult(CpuMonitor cpu, PeriodStats stats, double elapsed, int readWriteRatio) {
this.cpuUsage = cpu.usage();
this.heapMemoryUsed = MemoryMonitor.heapUsed();
this.directMemoryUsed = MemoryMonitor.directUsed();
this.produceRate = stats.messagesSent / elapsed;
this.produceThroughputBps = stats.bytesSent / elapsed;
this.errorRate = stats.messagesSendFailed / elapsed;
this.consumeRate = stats.messagesReceived / elapsed;
this.consumeThroughputBps = stats.bytesReceived / elapsed;
this.backlog = Math.max(0, readWriteRatio * stats.totalMessagesSent - stats.totalMessagesReceived);
this.produceLatencyMeanMicros = stats.sendLatencyMicros.getMean();
this.produceLatencyMinMicros = stats.sendLatencyMicros.getMinValue();
this.produceLatency50thMicros = stats.sendLatencyMicros.getValueAtPercentile(50);
this.produceLatency75thMicros = stats.sendLatencyMicros.getValueAtPercentile(75);
this.produceLatency90thMicros = stats.sendLatencyMicros.getValueAtPercentile(90);
Expand All @@ -273,6 +293,7 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
this.produceLatency9999thMicros = stats.sendLatencyMicros.getValueAtPercentile(99.99);
this.produceLatencyMaxMicros = stats.sendLatencyMicros.getMaxValue();
this.endToEndLatencyMeanMicros = stats.endToEndLatencyMicros.getMean();
this.endToEndLatencyMinMicros = stats.endToEndLatencyMicros.getMinValue();
this.endToEndLatency50thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(50);
this.endToEndLatency75thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(75);
this.endToEndLatency90thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(90);
Expand All @@ -286,18 +307,23 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
private void logIt(double elapsedTotal) {
LOGGER.info(PERIOD_LOG_FORMAT,
DURATION_FORMAT.format(elapsedTotal),
PERCENT_FORMAT.format(cpuUsage * 100),
MEMORY_FORMAT.format(heapMemoryUsed / BYTES_PER_MB),
MEMORY_FORMAT.format(directMemoryUsed / BYTES_PER_MB),
RATE_FORMAT.format(produceRate),
THROUGHPUT_FORMAT.format(produceThroughputBps / BYTES_PER_MB),
RATE_FORMAT.format(errorRate),
RATE_FORMAT.format(consumeRate),
THROUGHPUT_FORMAT.format(consumeThroughputBps / BYTES_PER_MB),
COUNT_FORMAT.format(backlog / 1_000.0),
LATENCY_FORMAT.format(produceLatencyMeanMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatencyMinMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency50thMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency99thMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency999thMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatencyMaxMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatencyMeanMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatencyMinMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency50thMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency99thMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency999thMicros / MICROS_PER_MILLI),
Expand All @@ -317,6 +343,7 @@ private static class CumulativeResult {
private final double consumeCountTotal;
private final double consumeSizeTotalBytes;
private final double produceLatencyMeanTotalMicros;
private final double produceLatencyMinTotalMicros;
private final double produceLatency50thTotalMicros;
private final double produceLatency75thTotalMicros;
private final double produceLatency90thTotalMicros;
Expand All @@ -327,6 +354,7 @@ private static class CumulativeResult {
private final double produceLatencyMaxTotalMicros;
public final Map<Double, Long> produceLatencyQuantilesMicros = new TreeMap<>();
private final double endToEndLatencyMeanTotalMicros;
private final double endToEndLatencyMinTotalMicros;
private final double endToEndLatency50thTotalMicros;
private final double endToEndLatency75thTotalMicros;
private final double endToEndLatency90thTotalMicros;
Expand All @@ -348,6 +376,7 @@ private CumulativeResult(CumulativeStats stats, double elapsedTotal) {
consumeCountTotal = stats.totalMessagesReceived;
consumeSizeTotalBytes = stats.totalBytesReceived;
produceLatencyMeanTotalMicros = stats.totalSendLatencyMicros.getMean();
produceLatencyMinTotalMicros = stats.totalSendLatencyMicros.getMinValue();
produceLatency50thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(50);
produceLatency75thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(75);
produceLatency90thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(90);
Expand All @@ -360,6 +389,7 @@ private CumulativeResult(CumulativeStats stats, double elapsedTotal) {
value -> produceLatencyQuantilesMicros.put(value.getPercentile(), value.getValueIteratedTo())
);
endToEndLatencyMeanTotalMicros = stats.totalEndToEndLatencyMicros.getMean();
endToEndLatencyMinTotalMicros = stats.totalEndToEndLatencyMicros.getMinValue();
endToEndLatency50thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(50);
endToEndLatency75thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(75);
endToEndLatency90thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(90);
Expand All @@ -385,6 +415,7 @@ private void logIt() {
COUNT_FORMAT.format(consumeCountTotal / 1_000_000.0),
COUNT_FORMAT.format(consumeSizeTotalBytes / BYTES_PER_GB),
LATENCY_FORMAT.format(produceLatencyMeanTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatencyMinTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency50thTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency75thTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatency90thTotalMicros / MICROS_PER_MILLI),
Expand All @@ -394,6 +425,7 @@ private void logIt() {
LATENCY_FORMAT.format(produceLatency9999thTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(produceLatencyMaxTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatencyMeanTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatencyMinTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency50thTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency75thTotalMicros / MICROS_PER_MILLI),
LATENCY_FORMAT.format(endToEndLatency90thTotalMicros / MICROS_PER_MILLI),
Expand Down
Loading