Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 migration s3 #2108

Merged
merged 31 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
d5f5b8f
update secrets manager to v2 in sdk
aimethed May 22, 2024
f889729
update ~half of connectors to use secrets manager v2
aimethed May 22, 2024
b717ea6
all others except ddb secrets v2
aimethed May 22, 2024
cd75a48
update ddb to secrets manager v2
aimethed May 24, 2024
fb591ee
remaining changes missed with initial passes
aimethed May 24, 2024
f11d7d0
updated glue to v2 in sdk
aimethed May 30, 2024
dfc3164
glue v2 for docdb and ddb
aimethed May 30, 2024
2616d10
glue v2 for hbase and elasticsearch
aimethed May 30, 2024
68a392c
glue v2 for gcs
aimethed May 30, 2024
4839562
glue v2 for neptune, redis, timestream
aimethed May 30, 2024
deeb559
glue v2 for kafka
aimethed May 30, 2024
40d76e4
glue v2 for msk
aimethed May 31, 2024
4e1fc93
removed hbase dependencies to shrink jar size
aimethed May 31, 2024
79d63b9
Merge branch 'v2-migration-secrets' into v2-migration-glue
aimethed May 31, 2024
04bd9f6
swap to apache http client for jar size reasons
aimethed May 31, 2024
37ff821
unused dependency remove hbase connector (#2021)
Trianz-Akshay Jun 14, 2024
35ee207
unused dependency remove bigquery connector (#2022)
Trianz-Akshay Jun 17, 2024
3ccf6e0
Merge branch 'master' into v2-migration-secrets
aimethed Jun 26, 2024
d35ea4b
Merge branch 'master' into v2-migration-glue
aimethed Jun 26, 2024
28b42dd
update ddb pom to use same v2 versions as sdk
aimethed Jun 27, 2024
ff03307
secrets manager v2 for clickhouse
aimethed Jun 27, 2024
c4fc0d6
Merge branch 'v2-migration-secrets' into v2-migration-glue
aimethed Jun 27, 2024
3be0aa1
attempt at v2 glue for merged kafka changes
aimethed Jun 27, 2024
fad93d7
Set Data format to GetSchemaVersionResponse object in test cases (#2057)
Jithendar12 Jun 28, 2024
7a5281d
migrated s3 to v2 in sdk
aimethed Jul 12, 2024
fe665b6
s3 v2 for cmdb, cloudwatch, docdb, dynamodb, example, hbase
aimethed Jul 12, 2024
cf0dd91
s3 v2 for elasticsearch, bigquery, jdbc, mysql, neptune, oracle, post…
aimethed Jul 12, 2024
7790e36
s3 v2 for cloudera hive, impala, datalakegen2, saphana, snowflake, sq…
aimethed Jul 15, 2024
8d00a97
s3 v2 for clickhouse, db2, db2as400, gcs, hortonworks hive, kafka, msk
aimethed Jul 15, 2024
41704e6
s3 v2 for vertica
aimethed Jul 18, 2024
3ad9732
resolved merge from v2-master
aimethed Jul 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import org.apache.arrow.util.VisibleForTesting;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -77,7 +77,7 @@ public AwsCmdbMetadataHandler(java.util.Map<String, String> configOptions)
protected AwsCmdbMetadataHandler(
TableProviderFactory tableProviderFactory,
EncryptionKeyFactory keyFactory,
AWSSecretsManager secretsManager,
SecretsManagerClient secretsManager,
AmazonAthena athena,
String spillBucket,
String spillPrefix,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import org.apache.arrow.util.VisibleForTesting;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.Map;

Expand Down Expand Up @@ -56,7 +56,7 @@ public AwsCmdbRecordHandler(java.util.Map<String, String> configOptions)
}

@VisibleForTesting
protected AwsCmdbRecordHandler(AmazonS3 amazonS3, AWSSecretsManager secretsManager, AmazonAthena athena, TableProviderFactory tableProviderFactory, java.util.Map<String, String> configOptions)
protected AwsCmdbRecordHandler(S3Client amazonS3, SecretsManagerClient secretsManager, AmazonAthena athena, TableProviderFactory tableProviderFactory, java.util.Map<String, String> configOptions)
{
super(amazonS3, secretsManager, athena, SOURCE_TYPE, configOptions);
tableProviders = tableProviderFactory.getTableProviders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.rds.AmazonRDS;
import com.amazonaws.services.rds.AmazonRDSClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.apache.arrow.util.VisibleForTesting;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -62,12 +61,12 @@ public TableProviderFactory(java.util.Map<String, String> configOptions)
AmazonEC2ClientBuilder.standard().build(),
AmazonElasticMapReduceClientBuilder.standard().build(),
AmazonRDSClientBuilder.standard().build(),
AmazonS3ClientBuilder.standard().build(),
S3Client.create(),
configOptions);
}

@VisibleForTesting
protected TableProviderFactory(AmazonEC2 ec2, AmazonElasticMapReduce emr, AmazonRDS rds, AmazonS3 amazonS3, java.util.Map<String, String> configOptions)
protected TableProviderFactory(AmazonEC2 ec2, AmazonElasticMapReduce emr, AmazonRDS rds, S3Client amazonS3, java.util.Map<String, String> configOptions)
{
addProvider(new Ec2TableProvider(ec2));
addProvider(new EbsTableProvider(ec2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.Owner;
import org.apache.arrow.vector.types.pojo.Schema;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.Bucket;
import software.amazon.awssdk.services.s3.model.GetBucketAclRequest;
import software.amazon.awssdk.services.s3.model.GetBucketAclResponse;
import software.amazon.awssdk.services.s3.model.Owner;

/**
* Maps your S3 Objects to a table.
Expand All @@ -41,9 +43,9 @@ public class S3BucketsTableProvider
implements TableProvider
{
private static final Schema SCHEMA;
private AmazonS3 amazonS3;
private S3Client amazonS3;

public S3BucketsTableProvider(AmazonS3 amazonS3)
public S3BucketsTableProvider(S3Client amazonS3)
{
this.amazonS3 = amazonS3;
}
Expand Down Expand Up @@ -84,7 +86,7 @@ public GetTableResponse getTable(BlockAllocator blockAllocator, GetTableRequest
@Override
public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker)
{
for (Bucket next : amazonS3.listBuckets()) {
for (Bucket next : amazonS3.listBuckets().buckets()) {
toRow(next, spiller);
}
}
Expand All @@ -102,13 +104,15 @@ private void toRow(Bucket bucket,
{
spiller.writeRows((Block block, int row) -> {
boolean matched = true;
matched &= block.offerValue("bucket_name", row, bucket.getName());
matched &= block.offerValue("create_date", row, bucket.getCreationDate());
matched &= block.offerValue("bucket_name", row, bucket.name());
matched &= block.offerValue("create_date", row, bucket.creationDate());

Owner owner = bucket.getOwner();
GetBucketAclResponse response = amazonS3.getBucketAcl(GetBucketAclRequest.builder().bucket(bucket.name()).build());

Owner owner = response.owner();
if (owner != null) {
matched &= block.offerValue("owner_name", row, bucket.getOwner().getDisplayName());
matched &= block.offerValue("owner_id", row, bucket.getOwner().getId());
matched &= block.offerValue("owner_name", row, owner.displayName());
matched &= block.offerValue("owner_id", row, owner.id());
}

return matched ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@
import com.amazonaws.athena.connector.lambda.metadata.GetTableResponse;
import com.amazonaws.athena.connector.lambda.records.ReadRecordsRequest;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.arrow.vector.types.pojo.Schema;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.Owner;
import software.amazon.awssdk.services.s3.model.S3Object;

/**
* Maps your S3 Objects to a table.
Expand All @@ -45,9 +45,9 @@ public class S3ObjectsTableProvider
{
private static final int MAX_KEYS = 1000;
private static final Schema SCHEMA;
private AmazonS3 amazonS3;
private S3Client amazonS3;

public S3ObjectsTableProvider(AmazonS3 amazonS3)
public S3ObjectsTableProvider(S3Client amazonS3)
{
this.amazonS3 = amazonS3;
}
Expand Down Expand Up @@ -98,42 +98,44 @@ public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsR
"(e.g. where bucket_name='my_bucket'.");
}

ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucket).withMaxKeys(MAX_KEYS);
ListObjectsV2Result result;
ListObjectsV2Request req = ListObjectsV2Request.builder().bucket(bucket).maxKeys(MAX_KEYS).build();
ListObjectsV2Response response;
do {
result = amazonS3.listObjectsV2(req);
for (S3ObjectSummary objectSummary : result.getObjectSummaries()) {
toRow(objectSummary, spiller);
response = amazonS3.listObjectsV2(req);
for (S3Object s3Object : response.contents()) {
toRow(s3Object, spiller, bucket);
}
req.setContinuationToken(result.getNextContinuationToken());
req = req.toBuilder().continuationToken(response.nextContinuationToken()).build();
}
while (result.isTruncated() && queryStatusChecker.isQueryRunning());
while (response.isTruncated() && queryStatusChecker.isQueryRunning());
}

/**
* Maps a DBInstance into a row in our Apache Arrow response block(s).
*
* @param objectSummary The S3 ObjectSummary to map.
* @param s3Object The S3 object to map.
* @param spiller The BlockSpiller to use when we want to write a matching row to the response.
* @param bucket The name of the S3 bucket
* @note The current implementation is rather naive in how it maps fields. It leverages a static
* list of fields that we'd like to provide and then explicitly filters and converts each field.
*/
private void toRow(S3ObjectSummary objectSummary,
BlockSpiller spiller)
private void toRow(S3Object s3Object,
BlockSpiller spiller,
String bucket)
{
spiller.writeRows((Block block, int row) -> {
boolean matched = true;
matched &= block.offerValue("bucket_name", row, objectSummary.getBucketName());
matched &= block.offerValue("e_tag", row, objectSummary.getETag());
matched &= block.offerValue("key", row, objectSummary.getKey());
matched &= block.offerValue("bytes", row, objectSummary.getSize());
matched &= block.offerValue("storage_class", row, objectSummary.getStorageClass());
matched &= block.offerValue("last_modified", row, objectSummary.getLastModified());
matched &= block.offerValue("bucket_name", row, bucket);
matched &= block.offerValue("e_tag", row, s3Object.eTag());
matched &= block.offerValue("key", row, s3Object.key());
matched &= block.offerValue("bytes", row, s3Object.size());
matched &= block.offerValue("storage_class", row, s3Object.storageClassAsString());
matched &= block.offerValue("last_modified", row, s3Object.lastModified());

Owner owner = objectSummary.getOwner();
Owner owner = s3Object.owner();
if (owner != null) {
matched &= block.offerValue("owner_name", row, owner.getDisplayName());
matched &= block.offerValue("owner_id", row, owner.getId());
matched &= block.offerValue("owner_name", row, owner.displayName());
matched &= block.offerValue("owner_id", row, owner.id());
}

return matched ? 1 : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -75,7 +75,7 @@ public class AwsCmdbMetadataHandlerTest
private FederatedIdentity identity = new FederatedIdentity("arn", "account", Collections.emptyMap(), Collections.emptyList());

@Mock
private AmazonS3 mockS3;
private S3Client mockS3;

@Mock
private TableProviderFactory mockTableProviderFactory;
Expand All @@ -98,7 +98,7 @@ public class AwsCmdbMetadataHandlerTest
private Block mockBlock;

@Mock
private AWSSecretsManager mockSecretsManager;
private SecretsManagerClient mockSecretsManager;

@Mock
private AmazonAthena mockAthena;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.athena.connectors.aws.cmdb.tables.TableProvider;
import com.amazonaws.services.athena.AmazonAthena;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient;

import java.util.Collections;
import java.util.UUID;
Expand All @@ -62,7 +62,7 @@ public class AwsCmdbRecordHandlerTest
private FederatedIdentity identity = new FederatedIdentity("arn", "account", Collections.emptyMap(), Collections.emptyList());

@Mock
private AmazonS3 mockS3;
private S3Client mockS3;

@Mock
private TableProviderFactory mockTableProviderFactory;
Expand All @@ -77,7 +77,7 @@ public class AwsCmdbRecordHandlerTest
private TableProvider mockTableProvider;

@Mock
private AWSSecretsManager mockSecretsManager;
private SecretsManagerClient mockSecretsManager;

@Mock
private AmazonAthena mockAthena;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.rds.AmazonRDS;
import com.amazonaws.services.s3.AmazonS3;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.List;
import java.util.Map;
Expand All @@ -51,7 +51,7 @@ public class TableProviderFactoryTest
private AmazonRDS mockRds;

@Mock
private AmazonS3 amazonS3;
private S3Client amazonS3;

private TableProviderFactory factory = new TableProviderFactory(mockEc2, mockEmr, mockRds, amazonS3, com.google.common.collect.ImmutableMap.of());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@
import com.amazonaws.athena.connector.lambda.security.EncryptionKeyFactory;
import com.amazonaws.athena.connector.lambda.security.FederatedIdentity;
import com.amazonaws.athena.connector.lambda.security.LocalKeyFactory;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.google.common.io.ByteStreams;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -59,8 +54,16 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
Expand All @@ -74,8 +77,6 @@
import static com.amazonaws.athena.connector.lambda.domain.predicate.Constraints.DEFAULT_NO_LIMIT;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -99,7 +100,7 @@ public abstract class AbstractTableProviderTest
private final List<ByteHolder> mockS3Store = new ArrayList<>();

@Mock
private AmazonS3 amazonS3;
private S3Client amazonS3;

@Mock
private QueryStatusChecker queryStatusChecker;
Expand Down Expand Up @@ -129,24 +130,24 @@ public void setUp()
{
allocator = new BlockAllocatorImpl();

when(amazonS3.putObject(any()))
when(amazonS3.putObject(any(PutObjectRequest.class), any(RequestBody.class)))
.thenAnswer((InvocationOnMock invocationOnMock) -> {
InputStream inputStream = ((PutObjectRequest) invocationOnMock.getArguments()[0]).getInputStream();
InputStream inputStream = ((RequestBody) invocationOnMock.getArguments()[1]).contentStreamProvider().newStream();
ByteHolder byteHolder = new ByteHolder();
byteHolder.setBytes(ByteStreams.toByteArray(inputStream));
mockS3Store.add(byteHolder);
return mock(PutObjectResult.class);
return PutObjectResponse.builder().build();
});

when(amazonS3.getObject(nullable(String.class), nullable(String.class)))
.thenAnswer((InvocationOnMock invocationOnMock) -> {
S3Object mockObject = mock(S3Object.class);
ByteHolder byteHolder = mockS3Store.get(0);
mockS3Store.remove(0);
when(mockObject.getObjectContent()).thenReturn(
new S3ObjectInputStream(
new ByteArrayInputStream(byteHolder.getBytes()), null));
return mockObject;
when(amazonS3.getObject(any(GetObjectRequest.class)))
.thenAnswer(new Answer<Object>()
{
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable
{
return new ResponseInputStream<>(GetObjectResponse.builder().build(), new ByteArrayInputStream(mockS3Store.get(0).getBytes()));
}
});

blockSpillReader = new S3BlockSpillReader(amazonS3, allocator);
Expand Down
Loading