Skip to content

Commit c2d71b0

Browse files
authored
feat(tools/perf): log cpu, memory usage and min latency (#2612)
* chore(tools/perf): add an option "--send-throughput" Signed-off-by: Ning Yu <ningyu@automq.com> * feat(tools/perf): log cpu and memory usages (#2607) * feat: introduce `CpuMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat: introduce `MemoryMonitor` Signed-off-by: Ning Yu <ningyu@automq.com> * feat(tools/perf): log cpu and memory usages Signed-off-by: Ning Yu <ningyu@automq.com> --------- Signed-off-by: Ning Yu <ningyu@automq.com> * feat(tools/perf): log the min latency (#2608) Signed-off-by: Ning Yu <ningyu@automq.com> --------- Signed-off-by: Ning Yu <ningyu@automq.com>
1 parent 1d768b3 commit c2d71b0

File tree

6 files changed

+115
-11
lines changed

6 files changed

+115
-11
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2253,6 +2253,7 @@ project(':tools') {
22532253
exclude group: 'org.apache.kafka', module: 'kafka-clients'
22542254
}
22552255
implementation libs.bucket4j
2256+
implementation libs.oshi
22562257
// AutoMQ inject end
22572258

22582259
// for SASL/OAUTHBEARER JWT validation

gradle/dependencies.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ versions += [
179179
hdrHistogram:"2.1.12",
180180
nettyTcnativeBoringSsl: "2.0.65.Final",
181181
confluentSchema: "7.4.0",
182+
oshi: "6.8.1",
182183
// AutoMQ inject end
183184

184185
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
@@ -312,4 +313,5 @@ libs += [
312313
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
313314
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
314315
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
316+
oshi: "com.github.oshi:oshi-core:$versions.oshi",
315317
]
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import oshi.SystemInfo;
4+
import oshi.hardware.CentralProcessor;
5+
6+
public class CpuMonitor {
7+
private final CentralProcessor processor;
8+
private long[] prevTicks;
9+
10+
public CpuMonitor() {
11+
this.processor = new SystemInfo().getHardware().getProcessor();
12+
this.prevTicks = processor.getSystemCpuLoadTicks();
13+
}
14+
15+
/**
16+
* Returns the CPU usage between the last call of this method and now.
17+
* It returns -1.0 if an error occurs.
18+
*
19+
* @return CPU load between 0 and 1 (100%)
20+
*/
21+
public synchronized double usage() {
22+
try {
23+
return usage0();
24+
} catch (Exception e) {
25+
return -1.0;
26+
}
27+
}
28+
29+
private double usage0() {
30+
long[] currTicks = processor.getSystemCpuLoadTicks();
31+
double usage = processor.getSystemCpuLoadBetweenTicks(prevTicks);
32+
prevTicks = currTicks;
33+
return usage;
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.apache.kafka.tools.automq.perf;
2+
3+
import java.lang.management.BufferPoolMXBean;
4+
import java.lang.management.ManagementFactory;
5+
import java.util.List;
6+
7+
public class MemoryMonitor {
8+
9+
/**
10+
* Returns the amount of heap memory used by the JVM in bytes.
11+
*/
12+
public static long heapUsed() {
13+
return Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
14+
}
15+
16+
/**
17+
* Returns the amount of direct memory used by the JVM in bytes.
18+
*/
19+
public static long directUsed() {
20+
List<BufferPoolMXBean> pools = ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
21+
return pools.stream()
22+
.filter(p -> "direct".equals(p.getName()))
23+
.mapToLong(BufferPoolMXBean::getMemoryUsed)
24+
.sum();
25+
}
26+
}

tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.HashMap;
3838
import java.util.List;
3939
import java.util.Map;
40+
import java.util.Optional;
4041
import java.util.Properties;
4142
import java.util.Random;
4243
import java.util.concurrent.ThreadLocalRandom;
@@ -105,7 +106,9 @@ public PerfConfig(String[] args) {
105106
recordSize = ns.getInt("recordSize");
106107
randomRatio = ns.getDouble("randomRatio");
107108
randomPoolSize = ns.getInt("randomPoolSize");
108-
sendRate = ns.getInt("sendRate");
109+
sendRate = Optional.ofNullable(ns.getDouble("sendThroughput"))
110+
.map(throughput -> (int) (throughput * 1024 * 1024 / recordSize))
111+
.orElse(ns.getInt("sendRate"));
109112
sendRateDuringCatchup = ns.getInt("sendRateDuringCatchup") == null ? sendRate : ns.getInt("sendRateDuringCatchup");
110113
maxConsumeRecordRate = ns.getInt("maxConsumeRecordRate");
111114
backlogDurationSeconds = ns.getInt("backlogDurationSeconds");
@@ -225,6 +228,11 @@ public static ArgumentParser parser() {
225228
.dest("sendRate")
226229
.metavar("SEND_RATE")
227230
.help("The send rate in messages per second.");
231+
parser.addArgument("-f", "--send-throughput")
232+
.type(Double.class)
233+
.dest("sendThroughput")
234+
.metavar("SEND_THROUGHPUT")
235+
.help("The send throughput in MB/s. If not set, the send rate will be used. This is an alternative to --send-rate.");
228236
parser.addArgument("-a", "--send-rate-during-catchup")
229237
.type(positiveInteger())
230238
.dest("sendRateDuringCatchup")

tools/src/main/java/org/apache/kafka/tools/automq/perf/StatsCollector.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,32 +42,39 @@ public class StatsCollector {
4242
private static final double BYTES_PER_MB = 1 << 20;
4343
private static final double BYTES_PER_GB = 1 << 30;
4444

45-
// max to 9999.9 (167 min)
45+
// max to 9999.9 s (167 min)
4646
private static final DecimalFormat DURATION_FORMAT = new PaddingDecimalFormat("0.0", 6);
47-
// max to 999999.99 (1M msg/s)
47+
// max to 99.99%
48+
private static final DecimalFormat PERCENT_FORMAT = new PaddingDecimalFormat("0.00", 5);
49+
// max to 99999 MiB (100 GiB)
50+
private static final DecimalFormat MEMORY_FORMAT = new PaddingDecimalFormat("0", 5);
51+
// max to 999999.99 msg/s (1M msg/s)
4852
private static final DecimalFormat RATE_FORMAT = new PaddingDecimalFormat("0.00", 9);
49-
// max to 999.99 (1 GB/s)
53+
// max to 999.99 MiB/s (1 GiB/s)
5054
private static final DecimalFormat THROUGHPUT_FORMAT = new PaddingDecimalFormat("0.00", 6);
51-
// max to 99999.999 (100 s)
55+
// max to 99999.999 ms (100 s)
5256
private static final DecimalFormat LATENCY_FORMAT = new PaddingDecimalFormat("0.000", 9);
57+
// max to 999.99
5358
private static final DecimalFormat COUNT_FORMAT = new PaddingDecimalFormat("0.00", 6);
5459

5560
private static final String PERIOD_LOG_FORMAT = "{}s" +
61+
" | CPU {}% | Mem {} MiB heap / {} MiB direct" +
5662
" | Prod rate {} msg/s / {} MiB/s | Prod err {} err/s" +
5763
" | Cons rate {} msg/s / {} MiB/s | Backlog: {} K msg" +
58-
" | Prod Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}" +
59-
" | E2E Latency (ms) avg: {} - 50%: {} - 99%: {} - 99.9%: {} - Max: {}";
64+
" | Prod Latency (ms) avg: {} - min: {} - 50%: {} - 99%: {} - 99.9%: {} - max: {}" +
65+
" | E2E Latency (ms) avg: {} - min: {} - 50%: {} - 99%: {} - 99.9%: {} - max: {}";
6066
private static final String SUMMARY_LOG_FORMAT = "Summary" +
6167
" | Prod rate {} msg/s / {} MiB/s | Prod total {} M msg / {} GiB / {} K err" +
6268
" | Cons rate {} msg/s / {} MiB/s | Cons total {} M msg / {} GiB" +
63-
" | Prod Latency (ms) avg: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}" +
64-
" | E2E Latency (ms) avg: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}";
69+
" | Prod Latency (ms) avg: {} - min: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - max: {}" +
70+
" | E2E Latency (ms) avg: {} - min: {} - 50%: {} - 75%: {} - 90%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - max: {}";
6571

6672
private static final Logger LOGGER = LoggerFactory.getLogger(StatsCollector.class);
6773

6874
public static Result printAndCollectStats(Stats stats, StopCondition condition, long intervalNanos,
6975
PerfConfig config) {
7076
final long start = System.nanoTime();
77+
CpuMonitor cpu = new CpuMonitor();
7178
Result result = new Result(config);
7279

7380
long last = start;
@@ -82,7 +89,7 @@ public static Result printAndCollectStats(Stats stats, StopCondition condition,
8289
double elapsed = (periodStats.nowNanos - last) / NANOS_PER_SEC;
8390
double elapsedTotal = (periodStats.nowNanos - start) / NANOS_PER_SEC;
8491

85-
PeriodResult periodResult = new PeriodResult(periodStats, elapsed, config.groupsPerTopic);
92+
PeriodResult periodResult = new PeriodResult(cpu, periodStats, elapsed, config.groupsPerTopic);
8693
result.update(periodResult, elapsedTotal);
8794
periodResult.logIt(elapsedTotal);
8895

@@ -152,6 +159,7 @@ public static class Result {
152159
public final List<Double> consumeThroughputBps = new ArrayList<>();
153160
public final List<Long> backlog = new ArrayList<>();
154161
public final List<Double> produceLatencyMeanMicros = new ArrayList<>();
162+
public final List<Double> produceLatencyMinMicros = new ArrayList<>();
155163
public final List<Double> produceLatency50thMicros = new ArrayList<>();
156164
public final List<Double> produceLatency75thMicros = new ArrayList<>();
157165
public final List<Double> produceLatency90thMicros = new ArrayList<>();
@@ -162,6 +170,7 @@ public static class Result {
162170
public final List<Double> produceLatencyMaxMicros = new ArrayList<>();
163171
public Map<Double, Long> produceLatencyQuantilesMicros;
164172
public final List<Double> endToEndLatencyMeanMicros = new ArrayList<>();
173+
public final List<Double> endToEndLatencyMinMicros = new ArrayList<>();
165174
public final List<Double> endToEndLatency50thMicros = new ArrayList<>();
166175
public final List<Double> endToEndLatency75thMicros = new ArrayList<>();
167176
public final List<Double> endToEndLatency90thMicros = new ArrayList<>();
@@ -186,6 +195,7 @@ private void update(PeriodResult periodResult, double elapsedTotal) {
186195
this.consumeThroughputBps.add(periodResult.consumeThroughputBps);
187196
this.backlog.add(periodResult.backlog);
188197
this.produceLatencyMeanMicros.add(periodResult.produceLatencyMeanMicros);
198+
this.produceLatencyMinMicros.add(periodResult.produceLatencyMinMicros);
189199
this.produceLatency50thMicros.add(periodResult.produceLatency50thMicros);
190200
this.produceLatency75thMicros.add(periodResult.produceLatency75thMicros);
191201
this.produceLatency90thMicros.add(periodResult.produceLatency90thMicros);
@@ -195,6 +205,7 @@ private void update(PeriodResult periodResult, double elapsedTotal) {
195205
this.produceLatency9999thMicros.add(periodResult.produceLatency9999thMicros);
196206
this.produceLatencyMaxMicros.add(periodResult.produceLatencyMaxMicros);
197207
this.endToEndLatencyMeanMicros.add(periodResult.endToEndLatencyMeanMicros);
208+
this.endToEndLatencyMinMicros.add(periodResult.endToEndLatencyMinMicros);
198209
this.endToEndLatency50thMicros.add(periodResult.endToEndLatency50thMicros);
199210
this.endToEndLatency75thMicros.add(periodResult.endToEndLatency75thMicros);
200211
this.endToEndLatency90thMicros.add(periodResult.endToEndLatency90thMicros);
@@ -239,13 +250,17 @@ private void update(CumulativeResult cumulativeResult) {
239250
}
240251

241252
private static class PeriodResult {
253+
private final double cpuUsage;
254+
private final long heapMemoryUsed;
255+
private final long directMemoryUsed;
242256
private final double produceRate;
243257
private final double produceThroughputBps;
244258
private final double errorRate;
245259
private final double consumeRate;
246260
private final double consumeThroughputBps;
247261
private final long backlog;
248262
private final double produceLatencyMeanMicros;
263+
private final double produceLatencyMinMicros;
249264
private final double produceLatency50thMicros;
250265
private final double produceLatency75thMicros;
251266
private final double produceLatency90thMicros;
@@ -255,6 +270,7 @@ private static class PeriodResult {
255270
private final double produceLatency9999thMicros;
256271
private final double produceLatencyMaxMicros;
257272
private final double endToEndLatencyMeanMicros;
273+
private final double endToEndLatencyMinMicros;
258274
private final double endToEndLatency50thMicros;
259275
private final double endToEndLatency75thMicros;
260276
private final double endToEndLatency90thMicros;
@@ -264,14 +280,18 @@ private static class PeriodResult {
264280
private final double endToEndLatency9999thMicros;
265281
private final double endToEndLatencyMaxMicros;
266282

267-
private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
283+
private PeriodResult(CpuMonitor cpu, PeriodStats stats, double elapsed, int readWriteRatio) {
284+
this.cpuUsage = cpu.usage();
285+
this.heapMemoryUsed = MemoryMonitor.heapUsed();
286+
this.directMemoryUsed = MemoryMonitor.directUsed();
268287
this.produceRate = stats.messagesSent / elapsed;
269288
this.produceThroughputBps = stats.bytesSent / elapsed;
270289
this.errorRate = stats.messagesSendFailed / elapsed;
271290
this.consumeRate = stats.messagesReceived / elapsed;
272291
this.consumeThroughputBps = stats.bytesReceived / elapsed;
273292
this.backlog = Math.max(0, readWriteRatio * stats.totalMessagesSent - stats.totalMessagesReceived);
274293
this.produceLatencyMeanMicros = stats.sendLatencyMicros.getMean();
294+
this.produceLatencyMinMicros = stats.sendLatencyMicros.getMinValue();
275295
this.produceLatency50thMicros = stats.sendLatencyMicros.getValueAtPercentile(50);
276296
this.produceLatency75thMicros = stats.sendLatencyMicros.getValueAtPercentile(75);
277297
this.produceLatency90thMicros = stats.sendLatencyMicros.getValueAtPercentile(90);
@@ -281,6 +301,7 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
281301
this.produceLatency9999thMicros = stats.sendLatencyMicros.getValueAtPercentile(99.99);
282302
this.produceLatencyMaxMicros = stats.sendLatencyMicros.getMaxValue();
283303
this.endToEndLatencyMeanMicros = stats.endToEndLatencyMicros.getMean();
304+
this.endToEndLatencyMinMicros = stats.endToEndLatencyMicros.getMinValue();
284305
this.endToEndLatency50thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(50);
285306
this.endToEndLatency75thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(75);
286307
this.endToEndLatency90thMicros = stats.endToEndLatencyMicros.getValueAtPercentile(90);
@@ -294,18 +315,23 @@ private PeriodResult(PeriodStats stats, double elapsed, int readWriteRatio) {
294315
private void logIt(double elapsedTotal) {
295316
LOGGER.info(PERIOD_LOG_FORMAT,
296317
DURATION_FORMAT.format(elapsedTotal),
318+
PERCENT_FORMAT.format(cpuUsage * 100),
319+
MEMORY_FORMAT.format(heapMemoryUsed / BYTES_PER_MB),
320+
MEMORY_FORMAT.format(directMemoryUsed / BYTES_PER_MB),
297321
RATE_FORMAT.format(produceRate),
298322
THROUGHPUT_FORMAT.format(produceThroughputBps / BYTES_PER_MB),
299323
RATE_FORMAT.format(errorRate),
300324
RATE_FORMAT.format(consumeRate),
301325
THROUGHPUT_FORMAT.format(consumeThroughputBps / BYTES_PER_MB),
302326
COUNT_FORMAT.format(backlog / 1_000.0),
303327
LATENCY_FORMAT.format(produceLatencyMeanMicros / MICROS_PER_MILLI),
328+
LATENCY_FORMAT.format(produceLatencyMinMicros / MICROS_PER_MILLI),
304329
LATENCY_FORMAT.format(produceLatency50thMicros / MICROS_PER_MILLI),
305330
LATENCY_FORMAT.format(produceLatency99thMicros / MICROS_PER_MILLI),
306331
LATENCY_FORMAT.format(produceLatency999thMicros / MICROS_PER_MILLI),
307332
LATENCY_FORMAT.format(produceLatencyMaxMicros / MICROS_PER_MILLI),
308333
LATENCY_FORMAT.format(endToEndLatencyMeanMicros / MICROS_PER_MILLI),
334+
LATENCY_FORMAT.format(endToEndLatencyMinMicros / MICROS_PER_MILLI),
309335
LATENCY_FORMAT.format(endToEndLatency50thMicros / MICROS_PER_MILLI),
310336
LATENCY_FORMAT.format(endToEndLatency99thMicros / MICROS_PER_MILLI),
311337
LATENCY_FORMAT.format(endToEndLatency999thMicros / MICROS_PER_MILLI),
@@ -325,6 +351,7 @@ private static class CumulativeResult {
325351
private final double consumeCountTotal;
326352
private final double consumeSizeTotalBytes;
327353
private final double produceLatencyMeanTotalMicros;
354+
private final double produceLatencyMinTotalMicros;
328355
private final double produceLatency50thTotalMicros;
329356
private final double produceLatency75thTotalMicros;
330357
private final double produceLatency90thTotalMicros;
@@ -335,6 +362,7 @@ private static class CumulativeResult {
335362
private final double produceLatencyMaxTotalMicros;
336363
public final Map<Double, Long> produceLatencyQuantilesMicros = new TreeMap<>();
337364
private final double endToEndLatencyMeanTotalMicros;
365+
private final double endToEndLatencyMinTotalMicros;
338366
private final double endToEndLatency50thTotalMicros;
339367
private final double endToEndLatency75thTotalMicros;
340368
private final double endToEndLatency90thTotalMicros;
@@ -356,6 +384,7 @@ private CumulativeResult(CumulativeStats stats, double elapsedTotal) {
356384
consumeCountTotal = stats.totalMessagesReceived;
357385
consumeSizeTotalBytes = stats.totalBytesReceived;
358386
produceLatencyMeanTotalMicros = stats.totalSendLatencyMicros.getMean();
387+
produceLatencyMinTotalMicros = stats.totalSendLatencyMicros.getMinValue();
359388
produceLatency50thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(50);
360389
produceLatency75thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(75);
361390
produceLatency90thTotalMicros = stats.totalSendLatencyMicros.getValueAtPercentile(90);
@@ -368,6 +397,7 @@ private CumulativeResult(CumulativeStats stats, double elapsedTotal) {
368397
value -> produceLatencyQuantilesMicros.put(value.getPercentile(), value.getValueIteratedTo())
369398
);
370399
endToEndLatencyMeanTotalMicros = stats.totalEndToEndLatencyMicros.getMean();
400+
endToEndLatencyMinTotalMicros = stats.totalEndToEndLatencyMicros.getMinValue();
371401
endToEndLatency50thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(50);
372402
endToEndLatency75thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(75);
373403
endToEndLatency90thTotalMicros = stats.totalEndToEndLatencyMicros.getValueAtPercentile(90);
@@ -393,6 +423,7 @@ private void logIt() {
393423
COUNT_FORMAT.format(consumeCountTotal / 1_000_000.0),
394424
COUNT_FORMAT.format(consumeSizeTotalBytes / BYTES_PER_GB),
395425
LATENCY_FORMAT.format(produceLatencyMeanTotalMicros / MICROS_PER_MILLI),
426+
LATENCY_FORMAT.format(produceLatencyMinTotalMicros / MICROS_PER_MILLI),
396427
LATENCY_FORMAT.format(produceLatency50thTotalMicros / MICROS_PER_MILLI),
397428
LATENCY_FORMAT.format(produceLatency75thTotalMicros / MICROS_PER_MILLI),
398429
LATENCY_FORMAT.format(produceLatency90thTotalMicros / MICROS_PER_MILLI),
@@ -402,6 +433,7 @@ private void logIt() {
402433
LATENCY_FORMAT.format(produceLatency9999thTotalMicros / MICROS_PER_MILLI),
403434
LATENCY_FORMAT.format(produceLatencyMaxTotalMicros / MICROS_PER_MILLI),
404435
LATENCY_FORMAT.format(endToEndLatencyMeanTotalMicros / MICROS_PER_MILLI),
436+
LATENCY_FORMAT.format(endToEndLatencyMinTotalMicros / MICROS_PER_MILLI),
405437
LATENCY_FORMAT.format(endToEndLatency50thTotalMicros / MICROS_PER_MILLI),
406438
LATENCY_FORMAT.format(endToEndLatency75thTotalMicros / MICROS_PER_MILLI),
407439
LATENCY_FORMAT.format(endToEndLatency90thTotalMicros / MICROS_PER_MILLI),

0 commit comments

Comments
 (0)