Skip to content

Commit 571065c

Browse files
committed
feat: add transform pipeline
1 parent fae0128 commit 571065c

File tree

8 files changed

+339
-38
lines changed

8 files changed

+339
-38
lines changed

checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
2222
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
2323
<suppress checks="ClassFanOutComplexity" files="S3ClientWrapper.java"/>
24+
<suppress checks="ClassFanOutComplexity" files="TransformPipeline.java"/>
2425
<suppress checks="ClassFanOutComplexity" files="UniversalRemoteStorageManager.java"/>
2526
<suppress checks="ClassDataAbstractionCoupling" files="S3ClientWrapper.java"/>
2627
<suppress checks="ClassDataAbstractionCoupling" files="S3RemoteStorageManagerConfig.java"/>
2728
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
29+
<suppress checks="ClassDataAbstractionCoupling" files="TransformPipeline.java"/>
2830
<suppress checks="AbbreviationAsWordInName" files="DataKeyAndAADEqualsTest"/>
2931
<suppress checks="ClassDataAbstractionCoupling" files="UniversalRemoteStorageManager.java"/>
3032
</suppressions>

commons/src/main/java/io/aiven/kafka/tieredstorage/commons/UniversalRemoteStorageManagerConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -221,27 +221,27 @@ String keyPrefix() {
221221
return getString(OBJECT_STORAGE_KEY_PREFIX_CONFIG);
222222
}
223223

224-
int chunkSize() {
224+
public int chunkSize() {
225225
return getInt(CHUNK_SIZE_CONFIG);
226226
}
227227

228-
boolean compressionEnabled() {
228+
public boolean compressionEnabled() {
229229
return getBoolean(COMPRESSION_CONFIG);
230230
}
231231

232-
boolean encryptionEnabled() {
232+
public boolean encryptionEnabled() {
233233
return getBoolean(ENCRYPTION_CONFIG);
234234
}
235235

236-
Path encryptionPublicKeyFile() {
236+
public Path encryptionPublicKeyFile() {
237237
final String value = getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG);
238238
if (value == null) {
239239
return null;
240240
}
241241
return Path.of(value);
242242
}
243243

244-
Path encryptionPrivateKeyFile() {
244+
public Path encryptionPrivateKeyFile() {
245245
final String value = getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG);
246246
if (value == null) {
247247
return null;

commons/src/main/java/io/aiven/kafka/tieredstorage/commons/transform/DetransformFinisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,8 @@ public InputStream nextElement() {
4444
final var chunk = inner.nextElement();
4545
return new ByteArrayInputStream(chunk);
4646
}
47+
48+
public InputStream sequence() {
49+
return new SequenceInputStream(this);
50+
}
4751
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.commons.transform;
18+
19+
import java.io.InputStream;
20+
import java.util.function.Function;
21+
22+
public final class InboundTransformChain {
23+
int originalSize;
24+
TransformChunkEnumeration inner;
25+
26+
public InboundTransformChain(final InputStream content, final int size, final int chunkSize) {
27+
originalSize = size;
28+
this.inner = new BaseTransformChunkEnumeration(content, chunkSize);
29+
}
30+
31+
public void chain(final Function<TransformChunkEnumeration, TransformChunkEnumeration> transformSupplier) {
32+
this.inner = transformSupplier.apply(this.inner);
33+
}
34+
35+
public TransformFinisher complete() {
36+
return new TransformFinisher(inner, originalSize);
37+
}
38+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.commons.transform;
18+
19+
import java.io.InputStream;
20+
import java.util.List;
21+
import java.util.function.Function;
22+
23+
import io.aiven.kafka.tieredstorage.commons.Chunk;
24+
25+
public final class OutboundTransformChain {
26+
DetransformChunkEnumeration inner;
27+
28+
public OutboundTransformChain(final InputStream uploadedData, final List<Chunk> chunks) {
29+
this.inner = new BaseDetransformChunkEnumeration(uploadedData, chunks);
30+
}
31+
32+
public void chain(final Function<DetransformChunkEnumeration, DetransformChunkEnumeration> transform) {
33+
this.inner = transform.apply(this.inner);
34+
}
35+
36+
public DetransformFinisher complete() {
37+
return new DetransformFinisher(inner);
38+
}
39+
}

commons/src/main/java/io/aiven/kafka/tieredstorage/commons/transform/TransformFinisher.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,8 @@ public ChunkIndex chunkIndex() {
8181
}
8282
return this.chunkIndex;
8383
}
84+
85+
public InputStream sequence() {
86+
return new SequenceInputStream(this);
87+
}
8488
}
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.tieredstorage.commons.transform;
18+
19+
import javax.crypto.Cipher;
20+
import javax.crypto.SecretKey;
21+
import javax.crypto.spec.SecretKeySpec;
22+
23+
import java.io.ByteArrayInputStream;
24+
import java.io.IOException;
25+
import java.io.InputStream;
26+
import java.nio.file.Files;
27+
import java.nio.file.Path;
28+
import java.util.List;
29+
import java.util.function.BiFunction;
30+
import java.util.function.Function;
31+
import java.util.function.Supplier;
32+
33+
import io.aiven.kafka.tieredstorage.commons.Chunk;
34+
import io.aiven.kafka.tieredstorage.commons.UniversalRemoteStorageManagerConfig;
35+
import io.aiven.kafka.tieredstorage.commons.manifest.SegmentEncryptionMetadata;
36+
import io.aiven.kafka.tieredstorage.commons.manifest.SegmentEncryptionMetadataV1;
37+
import io.aiven.kafka.tieredstorage.commons.manifest.SegmentManifest;
38+
import io.aiven.kafka.tieredstorage.commons.manifest.SegmentManifestV1;
39+
import io.aiven.kafka.tieredstorage.commons.manifest.serde.DataKeyDeserializer;
40+
import io.aiven.kafka.tieredstorage.commons.manifest.serde.DataKeySerializer;
41+
import io.aiven.kafka.tieredstorage.commons.security.AesEncryptionProvider;
42+
import io.aiven.kafka.tieredstorage.commons.security.DataKeyAndAAD;
43+
import io.aiven.kafka.tieredstorage.commons.security.RsaEncryptionProvider;
44+
45+
import com.fasterxml.jackson.core.JsonProcessingException;
46+
import com.fasterxml.jackson.databind.ObjectMapper;
47+
import com.fasterxml.jackson.databind.module.SimpleModule;
48+
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
49+
50+
public class TransformPipeline {
51+
52+
final int chunkSize;
53+
final boolean withCompression;
54+
final boolean withEncryption;
55+
final DataKeyAndAAD dataKeyAndAAD;
56+
final int ivSize;
57+
final Supplier<Cipher> inboundCipherSupplier;
58+
final BiFunction<byte[], SegmentEncryptionMetadata, Cipher> outboundCipherSupplier;
59+
final ObjectMapper objectMapper;
60+
61+
public TransformPipeline(final int chunkSize,
62+
final boolean withCompression,
63+
final boolean withEncryption,
64+
final DataKeyAndAAD dataKeyAndAAD,
65+
final int ivSize,
66+
final Supplier<Cipher> inboundCipherSupplier,
67+
final BiFunction<byte[], SegmentEncryptionMetadata, Cipher> outboundCipherSupplier,
68+
final ObjectMapper objectMapper) {
69+
this.chunkSize = chunkSize;
70+
this.withCompression = withCompression;
71+
this.withEncryption = withEncryption;
72+
this.dataKeyAndAAD = dataKeyAndAAD;
73+
this.ivSize = ivSize;
74+
this.inboundCipherSupplier = inboundCipherSupplier;
75+
this.outboundCipherSupplier = outboundCipherSupplier;
76+
this.objectMapper = objectMapper;
77+
}
78+
79+
80+
public static TransformPipeline.Builder newBuilder() {
81+
return new Builder();
82+
}
83+
84+
public SegmentManifest segmentManifest(final TransformFinisher transformFinisher) {
85+
SegmentEncryptionMetadataV1 encryption = null;
86+
if (withEncryption) {
87+
encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad);
88+
}
89+
return new SegmentManifestV1(transformFinisher.chunkIndex(), withCompression, encryption);
90+
}
91+
92+
public InboundTransformChain inboundTransformChain(final Path logPath) throws IOException {
93+
return inboundTransformChain(Files.newInputStream(logPath), (int) Files.size(logPath));
94+
}
95+
96+
public InboundTransformChain inboundTransformChain(final InputStream content, final int size) {
97+
final Function<InboundTransformChain, InboundTransformChain> inboundFunction = inboundTransformChain -> {
98+
if (withCompression) {
99+
inboundTransformChain.chain(CompressionChunkEnumeration::new);
100+
}
101+
if (withEncryption) {
102+
inboundTransformChain.chain(inboundTransform ->
103+
new EncryptionChunkEnumeration(inboundTransform, inboundCipherSupplier));
104+
}
105+
return inboundTransformChain;
106+
};
107+
return inboundFunction.apply(new InboundTransformChain(content, size, chunkSize));
108+
}
109+
110+
public OutboundTransformChain outboundTransformChain(final InputStream uploadedData,
111+
final SegmentManifest manifest,
112+
final Chunk chunk) {
113+
return outboundTransformChain(uploadedData, manifest, List.of(chunk));
114+
}
115+
116+
public OutboundTransformChain outboundTransformChain(final InputStream uploadedData,
117+
final SegmentManifest manifest,
118+
final List<Chunk> chunks) {
119+
final Function<OutboundTransformChain, OutboundTransformChain> outboundFunction =
120+
outboundTransformChain -> {
121+
if (withEncryption) {
122+
outboundTransformChain.chain(
123+
outboundTransform ->
124+
new DecryptionChunkEnumeration(
125+
outboundTransform,
126+
ivSize,
127+
bytes -> outboundCipherSupplier.apply(bytes, manifest.encryption().get())));
128+
}
129+
if (withCompression) {
130+
outboundTransformChain.chain(DecompressionChunkEnumeration::new);
131+
}
132+
return outboundTransformChain;
133+
};
134+
return outboundFunction.apply(new OutboundTransformChain(uploadedData, chunks));
135+
}
136+
137+
public InputStream serializeSegmentManifest(final SegmentManifest segmentManifest) throws JsonProcessingException {
138+
return new ByteArrayInputStream(objectMapper.writeValueAsBytes(segmentManifest));
139+
}
140+
141+
public SegmentManifest deserializeSegmentManifestContent(final InputStream content) throws IOException {
142+
return objectMapper.readValue(content, SegmentManifestV1.class);
143+
}
144+
145+
public static class Builder {
146+
private int chunkSize;
147+
private boolean withEncryption = false;
148+
private int ivSize = -1;
149+
private Supplier<Cipher> inboundCipherSupplier = null;
150+
private BiFunction<byte[], SegmentEncryptionMetadata, Cipher> outboundCipherSupplier = null;
151+
private boolean withCompression = false;
152+
private DataKeyAndAAD dataKeyAndAAD;
153+
private RsaEncryptionProvider rsaEncryptionProvider;
154+
155+
private ObjectMapper getObjectMapper() {
156+
final ObjectMapper objectMapper = new ObjectMapper();
157+
objectMapper.registerModule(new Jdk8Module());
158+
if (withEncryption) {
159+
final SimpleModule simpleModule = new SimpleModule();
160+
simpleModule.addSerializer(SecretKey.class,
161+
new DataKeySerializer(rsaEncryptionProvider::encryptDataKey));
162+
simpleModule.addDeserializer(SecretKey.class, new DataKeyDeserializer(
163+
b -> new SecretKeySpec(rsaEncryptionProvider.decryptDataKey(b), "AES")));
164+
objectMapper.registerModule(simpleModule);
165+
}
166+
return objectMapper;
167+
}
168+
169+
public Builder withChunkSize(final int chunkSize) {
170+
this.chunkSize = chunkSize;
171+
return this;
172+
}
173+
174+
public Builder withEncryption(final Path publicKeyFile, final Path privateKeyFile) {
175+
rsaEncryptionProvider = RsaEncryptionProvider.of(publicKeyFile, privateKeyFile);
176+
final AesEncryptionProvider aesEncryptionProvider = new AesEncryptionProvider();
177+
dataKeyAndAAD = aesEncryptionProvider.createDataKeyAndAAD();
178+
ivSize = aesEncryptionProvider.encryptionCipher(dataKeyAndAAD).getIV().length;
179+
withEncryption = true;
180+
inboundCipherSupplier = () -> aesEncryptionProvider.encryptionCipher(dataKeyAndAAD);
181+
outboundCipherSupplier = aesEncryptionProvider::decryptionCipher;
182+
return this;
183+
}
184+
185+
public Builder withCompression() {
186+
withCompression = true;
187+
return this;
188+
}
189+
190+
public Builder fromConfig(final UniversalRemoteStorageManagerConfig config) {
191+
withChunkSize(config.chunkSize());
192+
if (config.compressionEnabled()) {
193+
withCompression();
194+
}
195+
if (config.encryptionEnabled()) {
196+
withEncryption(config.encryptionPublicKeyFile(), config.encryptionPrivateKeyFile());
197+
}
198+
return this;
199+
}
200+
201+
public TransformPipeline build() {
202+
return new TransformPipeline(
203+
chunkSize,
204+
withCompression,
205+
withEncryption,
206+
dataKeyAndAAD,
207+
ivSize,
208+
inboundCipherSupplier,
209+
outboundCipherSupplier,
210+
getObjectMapper());
211+
}
212+
}
213+
}

0 commit comments

Comments
 (0)