Skip to content

refactor: introduce transform chains #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="S3ClientWrapper.java"/>
<suppress checks="ClassFanOutComplexity" files="UniversalRemoteStorageManager.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3ClientWrapper.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3RemoteStorageManagerConfig.java"/>
<suppress checks="AbbreviationAsWordInName" files="DataKeyAndAADEqualsTest"/>
<suppress checks="ClassDataAbstractionCoupling" files="TransformPipeline.java"/>
</suppressions>
4 changes: 3 additions & 1 deletion commons/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

ext {
zstdVersion = "1.5.4-2"
jacksonVersion = "2.14.2"
}

dependencies {
// The client of a library should provide a runtime dependency. For now taking the version from kafka-storage-api
compileOnly 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
compileOnly "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion"
// The client of a library should provide a runtime dependency.
// For now taking the version from kafka-clients
compileOnly "com.github.luben:zstd-jni:$zstdVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package io.aiven.kafka.tieredstorage.commons;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Future;

import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;

Expand All @@ -26,9 +26,10 @@
public interface ChunkManager {
/**
* Gets a chunk of a segment.
* @return a future of {@link InputStream} of the chunk, plain text (i.e. decrypted and decompressed).
*
* @return an {@link InputStream} of the chunk, plain text (i.e. decrypted and decompressed).
*/
Future<InputStream> getChunk(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
SegmentManifest manifest,
int chunkId);
InputStream getChunk(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
SegmentManifest manifest,
int chunkId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,167 @@

package io.aiven.kafka.tieredstorage.commons;


import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.server.log.remote.storage.LogSegmentData;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;

public class UniversalRemoteStorageManager implements RemoteStorageManager {
import io.aiven.kafka.tieredstorage.commons.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.commons.storage.ObjectStorageFactory;
import io.aiven.kafka.tieredstorage.commons.transform.DetransformFinisher;
import io.aiven.kafka.tieredstorage.commons.transform.FetchChunkEnumeration;
import io.aiven.kafka.tieredstorage.commons.transform.TransformFinisher;
import io.aiven.kafka.tieredstorage.commons.transform.TransformPipeline;

import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.LEADER_EPOCH;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.OFFSET;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TIMESTAMP;
import static org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType.TRANSACTION;

public class UniversalRemoteStorageManager implements RemoteStorageManager, ChunkManager {
private TransformPipeline pipeline;
private ObjectStorageFactory objectStorageFactory;

private ObjectKey objectKey;

@Override
public void configure(final Map<String, ?> configs) {
throw new UnsupportedOperationException();
Objects.requireNonNull(configs, "configs must not be null");
final UniversalRemoteStorageManagerConfig config = new UniversalRemoteStorageManagerConfig(configs);
pipeline = TransformPipeline.newBuilder().fromConfig(config).build();
objectStorageFactory = config.objectStorageFactory();
objectKey = new ObjectKey(config.keyPrefix());
}

@Override
public void copyLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final LogSegmentData logSegmentData) throws RemoteStorageException {
throw new UnsupportedOperationException();
public void copyLogSegmentData(final RemoteLogSegmentMetadata segmentMetadata,
final LogSegmentData segmentData) throws RemoteStorageException {
Objects.requireNonNull(segmentMetadata, "remoteLogSegmentId must not be null");
Objects.requireNonNull(segmentData, "segmentData must not be null");
try {
final TransformFinisher complete = pipeline.inboundTransformChain(segmentData.logSegment()).complete();
try (final var sis = complete.sequence()) {
final String fileKey = objectKey.key(segmentMetadata, ObjectKey.Suffix.LOG);
objectStorageFactory.fileUploader().upload(sis, fileKey);
}

final SegmentManifest segmentManifest = pipeline.segmentManifest(complete);
uploadManifest(segmentMetadata, segmentManifest);
uploadIndexFile(segmentMetadata,
Files.newInputStream(segmentData.offsetIndex()), OFFSET);
uploadIndexFile(segmentMetadata,
Files.newInputStream(segmentData.timeIndex()), TIMESTAMP);
uploadIndexFile(segmentMetadata,
Files.newInputStream(segmentData.producerSnapshotIndex()), PRODUCER_SNAPSHOT);
if (segmentData.transactionIndex().isPresent()) {
uploadIndexFile(segmentMetadata,
Files.newInputStream(segmentData.transactionIndex().get()), TRANSACTION);
}
uploadIndexFile(segmentMetadata,
new ByteBufferInputStream(segmentData.leaderEpochIndex()), LEADER_EPOCH);
} catch (final IOException e) {
throw new RemoteStorageException(e);
}
}

private void uploadIndexFile(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final InputStream index,
final IndexType indexType) throws IOException {
objectStorageFactory.fileUploader().upload(index,
objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType)));
}

private void uploadManifest(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final SegmentManifest segmentManifest)
throws IOException {
final InputStream manifestContent = pipeline.serializeSegmentManifest(segmentManifest);
final String manifestFileKey = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST);

objectStorageFactory.fileUploader().upload(manifestContent, manifestFileKey);
}

@Override
public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final int startPosition) throws RemoteStorageException {
throw new UnsupportedOperationException();
return this.fetchLogSegment(remoteLogSegmentMetadata, startPosition,
remoteLogSegmentMetadata.segmentSizeInBytes());
}

@Override
public InputStream fetchLogSegment(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final int startPosition,
final int endPosition) throws RemoteStorageException {
throw new UnsupportedOperationException();
try {
final InputStream manifest = objectStorageFactory.fileFetcher()
.fetch(objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.MANIFEST));

final SegmentManifest segmentManifest = pipeline.deserializeSegmentManifestContent(manifest);
final FetchChunkEnumeration fetchChunkEnumeration = new FetchChunkEnumeration(
this,
remoteLogSegmentMetadata,
segmentManifest,
startPosition,
endPosition);
return new SequenceInputStream(fetchChunkEnumeration);
} catch (final IOException e) {
throw new RemoteStorageException(e);
}
}

@Override
public InputStream getChunk(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final SegmentManifest manifest,
final int chunkId) throws IOException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
final InputStream segmentFile = objectStorageFactory.fileFetcher()
.fetch(objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.LOG),
chunk.transformedPosition,
chunk.transformedPosition + chunk.transformedSize);
final DetransformFinisher complete = pipeline.outboundTransformChain(
segmentFile,
manifest,
List.of(chunk)
).complete();
return complete.nextElement();
}

@Override
public InputStream fetchIndex(final RemoteLogSegmentMetadata remoteLogSegmentMetadata,
final IndexType indexType) throws RemoteStorageException {
throw new UnsupportedOperationException();
try {
final String key = objectKey.key(remoteLogSegmentMetadata, ObjectKey.Suffix.fromIndexType(indexType));
return objectStorageFactory.fileFetcher().fetch(key);
} catch (final IOException e) {
throw new RemoteStorageException(e);
}

}

@Override
public void deleteLogSegmentData(final RemoteLogSegmentMetadata remoteLogSegmentMetadata)
throws RemoteStorageException {
throw new UnsupportedOperationException();
try {
for (final ObjectKey.Suffix suffix : ObjectKey.Suffix.values()) {
objectStorageFactory.fileDeleter()
.delete(objectKey.key(remoteLogSegmentMetadata, suffix));
}
} catch (final IOException e) {
throw new RemoteStorageException(e);
}
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
public void close() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public class UniversalRemoteStorageManagerConfig extends AbstractConfig {
CHUNK_SIZE_CONFIG,
ConfigDef.Type.INT,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Range.between(1, Integer.MAX_VALUE),
//TODO figure out sensible limit because Integer.Max_VALUE leads to overflow during encryption
ConfigDef.Range.between(1, Integer.MAX_VALUE / 2),
ConfigDef.Importance.HIGH,
CHUNK_SIZE_DOC
);
Expand Down Expand Up @@ -137,31 +138,31 @@ ObjectStorageFactory objectStorageFactory() {
return objectFactory;
}

String keyPrefix() {
public String keyPrefix() {
return getString(OBJECT_STORAGE_KEY_PREFIX_CONFIG);
}

int chunkSize() {
public int chunkSize() {
return getInt(CHUNK_SIZE_CONFIG);
}

boolean compressionEnabled() {
public boolean compressionEnabled() {
return getBoolean(COMPRESSION_CONFIG);
}

boolean encryptionEnabled() {
public boolean encryptionEnabled() {
return getBoolean(ENCRYPTION_CONFIG);
}

Path encryptionPublicKeyFile() {
public Path encryptionPublicKeyFile() {
final String value = getString(ENCRYPTION_PUBLIC_KEY_FILE_CONFIG);
if (value == null) {
return null;
}
return Path.of(value);
}

Path encryptionPrivateKeyFile() {
public Path encryptionPrivateKeyFile() {
final String value = getString(ENCRYPTION_PRIVATE_KEY_FILE_CONFIG);
if (value == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;

import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;

import io.aiven.kafka.tieredstorage.commons.manifest.SegmentEncryptionMetadata;

import static io.aiven.kafka.tieredstorage.commons.security.RsaEncryptionProvider.KEY_SIZE;
Expand All @@ -33,7 +36,15 @@ public class AesEncryptionProvider implements Encryption, Decryption {

private final KeyGenerator aesKeyGenerator;

public AesEncryptionProvider(final KeyGenerator aesKeyGenerator) {
public static AesEncryptionProvider of(final RsaEncryptionProvider rsaEncryptionProvider) {
try {
return new AesEncryptionProvider(rsaEncryptionProvider.keyGenerator());
} catch (NoSuchAlgorithmException | NoSuchProviderException e) {
throw new RuntimeException(e);
}
}

AesEncryptionProvider(final KeyGenerator aesKeyGenerator) {
this.aesKeyGenerator = aesKeyGenerator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.NoSuchAlgorithmException;
Expand Down Expand Up @@ -56,8 +58,8 @@ private RsaEncryptionProvider(final KeyPair rsaKeyPair) {
this.rsaKeyPair = rsaKeyPair;
}

public static RsaEncryptionProvider of(final InputStream rsaPublicKey,
final InputStream rsaPrivateKey) {
public static RsaEncryptionProvider of(final Path rsaPublicKey,
final Path rsaPrivateKey) {
LOGGER.info("Read RSA keys");
Objects.requireNonNull(rsaPublicKey, "rsaPublicKey hasn't been set");
Objects.requireNonNull(rsaPrivateKey, "rsaPrivateKey hasn't been set");
Expand Down Expand Up @@ -91,12 +93,12 @@ public byte[] decryptDataKey(final byte[] bytes) {

static class RsaKeysReader {

static KeyPair readRsaKeyPair(final InputStream publicKeyIn, final InputStream privateKeyIn) {
static KeyPair readRsaKeyPair(final Path publicKeyIn, final Path privateKeyIn) {
try {
final var publicKey = readPublicKey(publicKeyIn);
final var privateKey = readPrivateKey(privateKeyIn);
final var publicKey = readPublicKey(Files.newInputStream(publicKeyIn));
final var privateKey = readPrivateKey(Files.newInputStream(privateKeyIn));
return new KeyPair(publicKey, privateKey);
} catch (NoSuchAlgorithmException | InvalidKeySpecException e) {
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new IllegalArgumentException("Couldn't read RSA key pair", e);
}
}
Expand Down
Loading