Skip to content

Commit 0bfabf2

Browse files
committed
feat: use transform pipeline
1 parent 870c4c3 commit 0bfabf2

File tree

4 files changed

+35
-110
lines changed

4 files changed

+35
-110
lines changed

core/src/main/java/io/aiven/kafka/tieredstorage/ChunkManager.java

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,32 @@
1919
import java.io.ByteArrayInputStream;
2020
import java.io.IOException;
2121
import java.io.InputStream;
22-
import java.util.List;
2322
import java.util.Optional;
2423

2524
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
2625

2726
import io.aiven.kafka.tieredstorage.cache.ChunkCache;
28-
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
2927
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
30-
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
3128
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
3229
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
33-
import io.aiven.kafka.tieredstorage.transform.BaseDetransformChunkEnumeration;
34-
import io.aiven.kafka.tieredstorage.transform.DecompressionChunkEnumeration;
35-
import io.aiven.kafka.tieredstorage.transform.DecryptionChunkEnumeration;
36-
import io.aiven.kafka.tieredstorage.transform.DetransformChunkEnumeration;
37-
import io.aiven.kafka.tieredstorage.transform.DetransformFinisher;
30+
import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
3831

3932
import org.apache.commons.io.IOUtils;
4033

4134
public class ChunkManager {
4235
private final ObjectFetcher fetcher;
4336
private final ObjectKey objectKey;
44-
private final AesEncryptionProvider aesEncryptionProvider;
4537
private final ChunkCache chunkCache;
38+
private final TransformPipeline transformPipeline;
4639

4740
public ChunkManager(final ObjectFetcher fetcher,
4841
final ObjectKey objectKey,
49-
final AesEncryptionProvider aesEncryptionProvider,
50-
final ChunkCache chunkCache) {
42+
final ChunkCache chunkCache,
43+
final TransformPipeline transformPipeline) {
5144
this.fetcher = fetcher;
5245
this.objectKey = objectKey;
53-
this.aesEncryptionProvider = aesEncryptionProvider;
5446
this.chunkCache = chunkCache;
47+
this.transformPipeline = transformPipeline;
5548
}
5649

5750
/**
@@ -63,19 +56,8 @@ public InputStream getChunk(final RemoteLogSegmentMetadata remoteLogSegmentMetad
6356
final int chunkId) throws StorageBackendException {
6457
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
6558
final InputStream chunkContent = getChunkContent(remoteLogSegmentMetadata, chunk);
66-
DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
67-
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
68-
if (encryptionMetadata.isPresent()) {
69-
detransformEnum = new DecryptionChunkEnumeration(
70-
detransformEnum,
71-
encryptionMetadata.get().ivSize(),
72-
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
73-
);
74-
}
75-
if (manifest.compression()) {
76-
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
77-
}
78-
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
59+
final var detransformFinisher = transformPipeline.outboundTransformChain(chunkContent, manifest, chunk)
60+
.complete();
7961
return detransformFinisher.toInputStream();
8062
}
8163

core/src/main/java/io/aiven/kafka/tieredstorage/RemoteStorageManager.java

Lines changed: 16 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package io.aiven.kafka.tieredstorage;
1818

19-
import javax.crypto.SecretKey;
20-
import javax.crypto.spec.SecretKeySpec;
21-
2219
import java.io.ByteArrayInputStream;
2320
import java.io.File;
2421
import java.io.IOException;
@@ -35,31 +32,17 @@
3532
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
3633
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
3734

38-
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1;
3935
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
40-
import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1;
41-
import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex;
42-
import io.aiven.kafka.tieredstorage.manifest.serde.DataKeyDeserializer;
43-
import io.aiven.kafka.tieredstorage.manifest.serde.DataKeySerializer;
4436
import io.aiven.kafka.tieredstorage.metrics.Metrics;
45-
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
46-
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
47-
import io.aiven.kafka.tieredstorage.security.RsaEncryptionProvider;
4837
import io.aiven.kafka.tieredstorage.storage.BytesRange;
4938
import io.aiven.kafka.tieredstorage.storage.ObjectDeleter;
5039
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
5140
import io.aiven.kafka.tieredstorage.storage.ObjectUploader;
5241
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
53-
import io.aiven.kafka.tieredstorage.transform.BaseTransformChunkEnumeration;
54-
import io.aiven.kafka.tieredstorage.transform.CompressionChunkEnumeration;
55-
import io.aiven.kafka.tieredstorage.transform.EncryptionChunkEnumeration;
5642
import io.aiven.kafka.tieredstorage.transform.FetchChunkEnumeration;
57-
import io.aiven.kafka.tieredstorage.transform.TransformChunkEnumeration;
5843
import io.aiven.kafka.tieredstorage.transform.TransformFinisher;
44+
import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
5945

60-
import com.fasterxml.jackson.databind.ObjectMapper;
61-
import com.fasterxml.jackson.databind.module.SimpleModule;
62-
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
6346
import org.slf4j.Logger;
6447
import org.slf4j.LoggerFactory;
6548

@@ -83,13 +66,9 @@ public class RemoteStorageManager implements org.apache.kafka.server.log.remote.
8366
private ObjectDeleter deleter;
8467
private boolean compressionEnabled;
8568
private boolean compressionHeuristic;
86-
private boolean encryptionEnabled;
87-
private int chunkSize;
88-
private RsaEncryptionProvider rsaEncryptionProvider;
89-
private AesEncryptionProvider aesEncryptionProvider;
90-
private ObjectMapper mapper;
9169
private ChunkManager chunkManager;
9270
private ObjectKey objectKey;
71+
private TransformPipeline transformPipeline;
9372

9473
private SegmentManifestProvider segmentManifestProvider;
9574

@@ -111,49 +90,27 @@ public void configure(final Map<String, ?> configs) {
11190
uploader = config.storage();
11291
deleter = config.storage();
11392
objectKey = new ObjectKey(config.keyPrefix());
114-
encryptionEnabled = config.encryptionEnabled();
115-
if (encryptionEnabled) {
116-
rsaEncryptionProvider = RsaEncryptionProvider.of(
117-
config.encryptionPublicKeyFile(),
118-
config.encryptionPrivateKeyFile()
119-
);
120-
aesEncryptionProvider = new AesEncryptionProvider();
121-
}
122-
chunkManager = new ChunkManager(
123-
fetcher,
124-
objectKey,
125-
aesEncryptionProvider,
126-
config.chunkCache()
127-
);
12893

129-
chunkSize = config.chunkSize();
13094
compressionEnabled = config.compressionEnabled();
13195
compressionHeuristic = config.compressionHeuristicEnabled();
13296

133-
mapper = getObjectMapper();
97+
transformPipeline = TransformPipeline.newBuilder().fromConfig(config).build();
13498

99+
chunkManager = new ChunkManager(
100+
fetcher,
101+
objectKey,
102+
config.chunkCache(),
103+
transformPipeline
104+
);
135105
segmentManifestProvider = new SegmentManifestProvider(
136106
objectKey,
137107
config.segmentManifestCacheSize(),
138108
config.segmentManifestCacheRetention(),
139109
fetcher,
140-
mapper,
110+
transformPipeline.objectMapper(),
141111
executor);
142112
}
143113

144-
private ObjectMapper getObjectMapper() {
145-
final ObjectMapper objectMapper = new ObjectMapper();
146-
objectMapper.registerModule(new Jdk8Module());
147-
if (encryptionEnabled) {
148-
final SimpleModule simpleModule = new SimpleModule();
149-
simpleModule.addSerializer(SecretKey.class, new DataKeySerializer(rsaEncryptionProvider::encryptDataKey));
150-
simpleModule.addDeserializer(SecretKey.class, new DataKeyDeserializer(
151-
b -> new SecretKeySpec(rsaEncryptionProvider.decryptDataKey(b), "AES")));
152-
objectMapper.registerModule(simpleModule);
153-
}
154-
return objectMapper;
155-
}
156-
157114
@Override
158115
public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
159116
final LogSegmentData logSegmentData) throws RemoteStorageException {
@@ -165,27 +122,11 @@ public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMe
165122
final long startedMs = time.milliseconds();
166123

167124
try {
168-
TransformChunkEnumeration transformEnum = new BaseTransformChunkEnumeration(
169-
Files.newInputStream(logSegmentData.logSegment()), chunkSize);
170-
SegmentEncryptionMetadataV1 encryptionMetadata = null;
171-
final boolean requiresCompression = requiresCompression(logSegmentData);
172-
if (requiresCompression) {
173-
transformEnum = new CompressionChunkEnumeration(transformEnum);
174-
}
175-
if (encryptionEnabled) {
176-
final DataKeyAndAAD dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
177-
transformEnum = new EncryptionChunkEnumeration(
178-
transformEnum,
179-
() -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD));
180-
encryptionMetadata = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
181-
}
182-
final TransformFinisher transformFinisher =
183-
new TransformFinisher(transformEnum, remoteLogSegmentMetadata.segmentSizeInBytes());
125+
final var inboundTransformChain = transformPipeline.inboundTransformChain(logSegmentData.logSegment());
126+
final var transformFinisher = inboundTransformChain.complete();
184127
uploadSegmentLog(remoteLogSegmentMetadata, transformFinisher);
185128

186-
final ChunkIndex chunkIndex = transformFinisher.chunkIndex();
187-
final SegmentManifest segmentManifest =
188-
new SegmentManifestV1(chunkIndex, requiresCompression, encryptionMetadata);
129+
final SegmentManifest segmentManifest = transformPipeline.segmentManifest(transformFinisher.chunkIndex());
189130
uploadManifest(remoteLogSegmentMetadata, segmentManifest);
190131

191132
final InputStream offsetIndex = Files.newInputStream(logSegmentData.offsetIndex());
@@ -248,10 +189,9 @@ private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMeta
248189
private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
249190
final SegmentManifest segmentManifest)
250191
throws StorageBackendException, IOException {
251-
final String manifest = mapper.writeValueAsString(segmentManifest);
252-
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
253-
254-
try (final ByteArrayInputStream manifestContent = new ByteArrayInputStream(manifest.getBytes())) {
192+
final byte[] manifestBytes = transformPipeline.objectMapper().writeValueAsBytes(segmentManifest);
193+
try (final var manifestContent = new ByteArrayInputStream(manifestBytes)) {
194+
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);
255195
uploader.upload(manifestContent, manifestFileKey);
256196
}
257197
}

core/src/test/java/io/aiven/kafka/tieredstorage/ChunkManagerTest.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD;
3636
import io.aiven.kafka.tieredstorage.storage.StorageBackend;
3737
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
38+
import io.aiven.kafka.tieredstorage.transform.TransformPipeline;
3839

3940
import com.github.luben.zstd.ZstdCompressCtx;
4041
import org.junit.jupiter.api.BeforeEach;
@@ -51,7 +52,7 @@
5152
import static org.mockito.Mockito.when;
5253

5354
@ExtendWith(MockitoExtension.class)
54-
class ChunkManagerTest extends AesKeyAwareTest {
55+
class ChunkManagerTest extends RsaKeyAwareTest {
5556

5657
static final byte[] TEST_CHUNK_CONTENT = "0123456789".getBytes();
5758
@Mock
@@ -71,7 +72,8 @@ void testGetChunk() throws StorageBackendException {
7172
final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10);
7273

7374
final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, false, null);
74-
final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, null);
75+
final TransformPipeline transformPipeline = TransformPipeline.newBuilder().build();
76+
final ChunkManager chunkManager = new ChunkManager(storage, objectKey, null, transformPipeline);
7577
when(storage.fetch("test.log", chunkIndex.chunks().get(0).range()))
7678
.thenReturn(new ByteArrayInputStream("0123456789".getBytes()));
7779

@@ -95,8 +97,8 @@ void testGetChunkWithCaching() throws StorageBackendException {
9597
final ChunkManager chunkManager = new ChunkManager(
9698
storage,
9799
objectKey,
98-
null,
99-
new UnboundInMemoryChunkCache()
100+
new UnboundInMemoryChunkCache(),
101+
TransformPipeline.newBuilder().build()
100102
);
101103

102104
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasContent("0123456789");
@@ -132,8 +134,8 @@ void testGetChunkWithEncryption() throws Exception {
132134
final ChunkManager chunkManager = new ChunkManager(
133135
storage,
134136
objectKey,
135-
aesEncryptionProvider,
136-
new UnboundInMemoryChunkCache()
137+
new UnboundInMemoryChunkCache(),
138+
TransformPipeline.newBuilder().withEncryption(publicKeyPem, privateKeyPem).build()
137139
);
138140

139141
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);
@@ -165,8 +167,8 @@ void testGetChunkWithCompression() throws Exception {
165167
final ChunkManager chunkManager = new ChunkManager(
166168
storage,
167169
objectKey,
168-
null,
169-
new UnboundInMemoryChunkCache()
170+
new UnboundInMemoryChunkCache(),
171+
TransformPipeline.newBuilder().withCompression().build()
170172
);
171173

172174
assertThat(chunkManager.getChunk(remoteLogSegmentMetadata, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT);

core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationSourceInputStreamClosingTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ void setup() {
9696
void test(final ChunkCache chunkCache,
9797
final boolean readFully,
9898
final BytesRange range) throws StorageBackendException, IOException {
99-
final var chunkManager = new ChunkManager(fetcher, objectKey, null, chunkCache);
99+
final TransformPipeline transformPipeline = TransformPipeline.newBuilder().build();
100+
final var chunkManager = new ChunkManager(fetcher, objectKey, chunkCache, transformPipeline);
100101
final var is = new FetchChunkEnumeration(chunkManager, REMOTE_LOG_SEGMENT_METADATA, SEGMENT_MANIFEST, range)
101102
.toInputStream();
102103
if (readFully) {

0 commit comments

Comments
 (0)