Skip to content

Commit

Permalink
V2 migration s3 (#2108)
Browse files Browse the repository at this point in the history
  • Loading branch information
aimethed authored Jul 25, 2024
1 parent a60ed61 commit 2545c7e
Show file tree
Hide file tree
Showing 133 changed files with 1,258 additions and 1,035 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.amazonaws.athena.connector.lambda.handlers.RecordHandler;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.arrow.util.VisibleForTesting;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.Map;
Expand Down Expand Up @@ -56,7 +56,7 @@ public AwsCmdbRecordHandler(java.util.Map<String, String> configOptions)
}

@VisibleForTesting
protected AwsCmdbRecordHandler(AmazonS3 amazonS3, SecretsManagerClient secretsManager, AthenaClient athena, TableProviderFactory tableProviderFactory, java.util.Map<String, String> configOptions)
protected AwsCmdbRecordHandler(S3Client amazonS3, SecretsManagerClient secretsManager, AthenaClient athena, TableProviderFactory tableProviderFactory, java.util.Map<String, String> configOptions)
{
super(amazonS3, secretsManager, athena, SOURCE_TYPE, configOptions);
tableProviders = tableProviderFactory.getTableProviders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.rds.AmazonRDS;
import com.amazonaws.services.rds.AmazonRDSClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.arrow.util.VisibleForTesting;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -62,12 +61,12 @@ public TableProviderFactory(java.util.Map<String, String> configOptions)
AmazonEC2ClientBuilder.standard().build(),
AmazonElasticMapReduceClientBuilder.standard().build(),
AmazonRDSClientBuilder.standard().build(),
AmazonS3ClientBuilder.standard().build(),
S3Client.create(),
configOptions);
}

@VisibleForTesting
protected TableProviderFactory(AmazonEC2 ec2, AmazonElasticMapReduce emr, AmazonRDS rds, AmazonS3 amazonS3, java.util.Map<String, String> configOptions)
protected TableProviderFactory(AmazonEC2 ec2, AmazonElasticMapReduce emr, AmazonRDS rds, S3Client amazonS3, java.util.Map<String, String> configOptions)
{
addProvider(new Ec2TableProvider(ec2));
addProvider(new EbsTableProvider(ec2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.Owner;
import org.apache.arrow.vector.types.pojo.Schema;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.GetBucketAclRequest;
import software.amazon.awssdk.services.s3.model.GetBucketAclResponse;
import software.amazon.awssdk.services.s3.model.Owner;

/**
* Maps your S3 Objects to a table.
Expand All @@ -41,9 +43,9 @@ public class S3BucketsTableProvider
implements TableProvider
{
private static final Schema SCHEMA;
private AmazonS3 amazonS3;
private S3Client amazonS3;

public S3BucketsTableProvider(AmazonS3 amazonS3)
public S3BucketsTableProvider(S3Client amazonS3)
{
this.amazonS3 = amazonS3;
}
Expand Down Expand Up @@ -84,7 +86,7 @@ public GetTableResponse getTable(BlockAllocator blockAllocator, GetTableRequest
@Override
public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
{
for (Bucket next : amazonS3.listBuckets()) {
for (Bucket next : amazonS3.listBuckets().buckets()) {
toRow(next, spiller);
}
}
Expand All @@ -102,13 +104,15 @@ private void toRow(Bucket bucket,
{
spiller.writeRows((Block block, int row) -> {
boolean matched = true;
matched &= block.offerValue("bucket_name", row, bucket.getName());
matched &= block.offerValue("create_date", row, bucket.getCreationDate());
matched &= block.offerValue("bucket_name", row, bucket.name());
matched &= block.offerValue("create_date", row, bucket.creationDate());

Owner owner = bucket.getOwner();
GetBucketAclResponse response = amazonS3.getBucketAcl(GetBucketAclRequest.builder().bucket(bucket.name()).build());

Owner owner = response.owner();
if (owner != null) {
matched &= block.offerValue("owner_name", row, bucket.getOwner().getDisplayName());
matched &= block.offerValue("owner_id", row, bucket.getOwner().getId());
matched &= block.offerValue("owner_name", row, owner.displayName());
matched &= block.offerValue("owner_id", row, owner.id());
}

return matched ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.arrow.vector.types.pojo.Schema;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.Owner;
import software.amazon.awssdk.services.s3.model.S3Object;

/**
* Maps your S3 Objects to a table.
Expand All @@ -45,9 +45,9 @@ public class S3ObjectsTableProvider
{
private static final int MAX_KEYS = 1000;
private static final Schema SCHEMA;
private AmazonS3 amazonS3;
private S3Client amazonS3;

public S3ObjectsTableProvider(AmazonS3 amazonS3)
public S3ObjectsTableProvider(S3Client amazonS3)
{
this.amazonS3 = amazonS3;
}
Expand Down Expand Up @@ -98,42 +98,44 @@ public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsR
"(e.g. where bucket_name='my_bucket'.");
}

ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withMaxKeys(MAX_KEYS);
ListObjectsV2Result result;
ListObjectsV2Request req = ListObjectsV2Request.builder().bucket(bucket).maxKeys(MAX_KEYS).build();
ListObjectsV2Response response;
do {
result = amazonS3.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
toRow(objectSummary, spiller);
response = amazonS3.listObjectsV2(req);
for (S3Object s3Object : response.contents()) {
toRow(s3Object, spiller, bucket);
}
req.setContinuationToken(result.getNextContinuationToken());
req = req.toBuilder().continuationToken(response.nextContinuationToken()).build();
}
while (result.isTruncated() && queryStatusChecker.isQueryRunning());
while (response.isTruncated() && queryStatusChecker.isQueryRunning());
}

/**
* Maps a DBInstance into a row in our Apache Arrow response block(s).
*
* @param objectSummary The S3 ObjectSummary to map.
* @param s3Object The S3 object to map.
* @param spiller The BlockSpiller to use when we want to write a matching row to the response.
* @param bucket The name of the S3 bucket
* @note The current implementation is rather naive in how it maps fields. It leverages a static
* list of fields that we'd like to provide and then explicitly filters and converts each field.
*/
private void toRow(S3ObjectSummary objectSummary,
BlockSpiller spiller)
private void toRow(S3Object s3Object,
BlockSpiller spiller,
String bucket)
{
spiller.writeRows((Block block, int row) -> {
boolean matched = true;
matched &= block.offerValue("bucket_name", row, objectSummary.getBucketName());
matched &= block.offerValue("e_tag", row, objectSummary.getETag());
matched &= block.offerValue("key", row, objectSummary.getKey());
matched &= block.offerValue("bytes", row, objectSummary.getSize());
matched &= block.offerValue("storage_class", row, objectSummary.getStorageClass());
matched &= block.offerValue("last_modified", row, objectSummary.getLastModified());
matched &= block.offerValue("bucket_name", row, bucket);
matched &= block.offerValue("e_tag", row, s3Object.eTag());
matched &= block.offerValue("key", row, s3Object.key());
matched &= block.offerValue("bytes", row, s3Object.size());
matched &= block.offerValue("storage_class", row, s3Object.storageClassAsString());
matched &= block.offerValue("last_modified", row, s3Object.lastModified());

Owner owner = objectSummary.getOwner();
Owner owner = s3Object.owner();
if (owner != null) {
matched &= block.offerValue("owner_name", row, owner.getDisplayName());
matched &= block.offerValue("owner_id", row, owner.getId());
matched &= block.offerValue("owner_name", row, owner.displayName());
matched &= block.offerValue("owner_id", row, owner.id());
}

return matched ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.ArrayList;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class AwsCmdbMetadataHandlerTest
private FederatedIdentity identity = new FederatedIdentity("arn", "account", Collections.emptyMap(), Collections.emptyList());

@Mock
private AmazonS3 mockS3;
private S3Client mockS3;

@Mock
private TableProviderFactory mockTableProviderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.athena.AthenaClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.Collections;
Expand All @@ -62,7 +62,7 @@ public class AwsCmdbRecordHandlerTest
private FederatedIdentity identity = new FederatedIdentity("arn", "account", Collections.emptyMap(), Collections.emptyList());

@Mock
private AmazonS3 mockS3;
private S3Client mockS3;

@Mock
private TableProviderFactory mockTableProviderFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.rds.AmazonRDS;
import com.amazonaws.services.s3.AmazonS3;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.List;
import java.util.Map;
Expand All @@ -51,7 +51,7 @@ public class TableProviderFactoryTest
private AmazonRDS mockRds;

@Mock
private AmazonS3 amazonS3;
private S3Client amazonS3;

private TableProviderFactory factory = new TableProviderFactory(mockEc2, mockEmr, mockRds, amazonS3, com.google.common.collect.ImmutableMap.of());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.io.ByteStreams;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -59,8 +54,16 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand All @@ -74,8 +77,6 @@
import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -99,7 +100,7 @@ public abstract class AbstractTableProviderTest
private final List<ByteHolder> mockS3Store = new ArrayList<>();

@Mock
private AmazonS3 amazonS3;
private S3Client amazonS3;

@Mock
private QueryStatusChecker queryStatusChecker;
Expand Down Expand Up @@ -129,24 +130,24 @@ public void setUp()
{
allocator = new BlockAllocatorImpl();

when(amazonS3.putObject(any()))
when(amazonS3.putObject(any(PutObjectRequest.class), any(RequestBody.class)))
.thenAnswer((InvocationOnMock invocationOnMock) -> {
InputStream inputStream = ((PutObjectRequest) invocationOnMock.getArguments()[0]).getInputStream();
InputStream inputStream = ((RequestBody) invocationOnMock.getArguments()[1]).contentStreamProvider().newStream();
ByteHolder byteHolder = new ByteHolder();
byteHolder.setBytes(ByteStreams.toByteArray(inputStream));
mockS3Store.add(byteHolder);
return mock(PutObjectResult.class);
return PutObjectResponse.builder().build();
});

when(amazonS3.getObject(nullable(String.class), nullable(String.class)))
.thenAnswer((InvocationOnMock invocationOnMock) -> {
S3Object mockObject = mock(S3Object.class);
ByteHolder byteHolder = mockS3Store.get(0);
mockS3Store.remove(0);
when(mockObject.getObjectContent()).thenReturn(
new S3ObjectInputStream(
new ByteArrayInputStream(byteHolder.getBytes()), null));
return mockObject;
when(amazonS3.getObject(any(GetObjectRequest.class)))
.thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable
{
return new ResponseInputStream<>(GetObjectResponse.builder().build(), new ByteArrayInputStream(mockS3Store.get(0).getBytes()));
}
});

blockSpillReader = new S3BlockSpillReader(amazonS3, allocator);
Expand Down
Loading

0 comments on commit 2545c7e

Please sign in to comment.