Skip to content

Commit c8d7ceb

Browse files
committed
feat: benchmark fetch indexes
1 parent 1e728f5 commit c8d7ceb

File tree

2 files changed

+130
-0
lines changed

2 files changed

+130
-0
lines changed

benchmarks/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ ext {
2929

3030
dependencies {
3131
implementation project(':core')
32+
implementation project(':storage:s3')
33+
implementation project(':storage:gcs')
34+
implementation project(':storage:azure')
3235
implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion
3336
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion
3437

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.benchs.fetch;
18+
19+
import java.io.IOException;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.Properties;
25+
import java.util.Set;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import org.apache.kafka.common.TopicIdPartition;
29+
import org.apache.kafka.common.Uuid;
30+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
31+
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
32+
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
33+
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
34+
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
35+
36+
import io.aiven.kafka.tieredstorage.RemoteStorageManager;
37+
import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;
38+
39+
import org.openjdk.jmh.annotations.Benchmark;
40+
import org.openjdk.jmh.annotations.BenchmarkMode;
41+
import org.openjdk.jmh.annotations.Fork;
42+
import org.openjdk.jmh.annotations.Level;
43+
import org.openjdk.jmh.annotations.Measurement;
44+
import org.openjdk.jmh.annotations.Mode;
45+
import org.openjdk.jmh.annotations.OutputTimeUnit;
46+
import org.openjdk.jmh.annotations.Scope;
47+
import org.openjdk.jmh.annotations.Setup;
48+
import org.openjdk.jmh.annotations.State;
49+
import org.openjdk.jmh.annotations.Warmup;
50+
import org.openjdk.jmh.infra.Blackhole;
51+
52+
@State(Scope.Benchmark)
53+
@Fork(value = 1)
54+
@Warmup(iterations = 4, time = 10)
55+
@Measurement(iterations = 16, time = 30)
56+
@BenchmarkMode({Mode.SampleTime})
57+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
58+
public class FetchIndexesBenchmark {
59+
60+
// s3://jeqo-test1/tiered-storage-demo/t1-QW1...-Q/0/00000000000000057362-ABC....indexes
61+
final TopicIdPartition tip =
62+
new TopicIdPartition(Uuid.fromString("QW1-OwYOSt6w-CBTuczz-Q"), 0, "t1");
63+
final RemoteLogSegmentMetadata meta = new RemoteLogSegmentMetadata(
64+
new RemoteLogSegmentId(tip, Uuid.fromString("AYSp9LTtQyqRJF6u5bHdVg")), 57362L, 58302L - 1,
65+
0, 0, 0, 200 * 1024 * 1024, Map.of(0, 0L));
66+
67+
RemoteStorageManager rsm = new RemoteStorageManager();
68+
69+
@Setup(Level.Trial)
70+
public void setup() throws IOException {
71+
final var tmpDir = Files.createTempDirectory("rsm-cache");
72+
final var compression = false;
73+
final var encryption = false;
74+
final var cacheClass = DiskBasedChunkCache.class.getCanonicalName();
75+
// Configure the RSM.
76+
final var cacheDir = tmpDir.resolve("cache");
77+
Files.createDirectories(cacheDir);
78+
79+
final var props = new Properties();
80+
props.load(Files.newInputStream(Path.of("rsm.properties")));
81+
final Map<String, String> config = new HashMap<>();
82+
props.forEach((k, v) -> config.put((String) k, (String) v));
83+
// 4MiB
84+
final int chunkSize = 4 * 1024 * 1024;
85+
config.putAll(Map.of(
86+
"chunk.size", Integer.toString(chunkSize),
87+
"compression.enabled", Boolean.toString(compression),
88+
"encryption.enabled", Boolean.toString(encryption),
89+
"chunk.cache.class", cacheClass,
90+
"chunk.cache.path", cacheDir.toString(),
91+
"chunk.cache.size", Integer.toString(100 * 1024 * 1024),
92+
"custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY"
93+
));
94+
95+
rsm.configure(config);
96+
}
97+
98+
@Benchmark
99+
public void fetchIndexesV1(final Blackhole b) throws RemoteStorageException {
100+
final var offsetindex = rsm.fetchIndex(meta, IndexType.OFFSET);
101+
final var timeindex = rsm.fetchIndex(meta, IndexType.TIMESTAMP);
102+
try {
103+
final var txnindex = rsm.fetchIndex(meta, IndexType.TRANSACTION);
104+
b.consume(offsetindex);
105+
b.consume(timeindex);
106+
b.consume(txnindex);
107+
} catch (final RemoteResourceNotFoundException e) {
108+
b.consume(offsetindex);
109+
b.consume(timeindex);
110+
}
111+
}
112+
113+
@Benchmark
114+
public void fetchIndexesV2(final Blackhole b) throws RemoteStorageException {
115+
final var indexes = rsm.fetchIndexes(
116+
meta,
117+
Set.of(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION));
118+
b.consume(indexes);
119+
}
120+
121+
122+
@Benchmark
123+
public void fetchAllIndexesV2(final Blackhole b) throws RemoteStorageException {
124+
final var indexes = rsm.fetchAllIndexes(meta);
125+
b.consume(indexes);
126+
}
127+
}

0 commit comments

Comments
 (0)