Skip to content

Commit

Permalink
Adds new configurations to the S3 source to better define bucket owne…
Browse files Browse the repository at this point in the history
…rship. Resolves #2012. (#3012)

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Jul 26, 2023
1 parent 4c4677b commit a19d71d
Show file tree
Hide file tree
Showing 5 changed files with 327 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
import java.util.Map;

public class S3SourceConfig {
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(10);
Expand Down Expand Up @@ -62,6 +63,12 @@ public class S3SourceConfig {
@JsonProperty("disable_bucket_ownership_validation")
private boolean disableBucketOwnershipValidation = false;

@JsonProperty("bucket_owners")
private Map<String, String> bucketOwners;

@JsonProperty("default_bucket_owner")
private String defaultBucketOwner;

@JsonProperty("metadata_root_key")
private String metadataRootKey = DEFAULT_METADATA_ROOT_KEY;
@JsonProperty("s3_select")
Expand Down Expand Up @@ -135,4 +142,11 @@ public S3ScanScanOptions getS3ScanScanOptions() {
return s3ScanScanOptions;
}

public Map<String, String> getBucketOwners() {
return bucketOwners;
}

public String getDefaultBucketOwner() {
return defaultBucketOwner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.opensearch.dataprepper.plugins.source.StsArnRole;

import java.net.MalformedURLException;
import java.util.Objects;

/**
* Produces a {@link BucketOwnerProvider} from the S3 source configuration as
Expand All @@ -25,8 +24,21 @@ public class ConfigBucketOwnerProviderFactory {
public BucketOwnerProvider createBucketOwnerProvider(final S3SourceConfig s3SourceConfig) {
if(s3SourceConfig.isDisableBucketOwnershipValidation())
return new NoOwnershipBucketOwnerProvider();
StaticBucketOwnerProvider staticBucketOwnerProvider = getStaticBucketOwnerProvider(s3SourceConfig);

if(s3SourceConfig.getBucketOwners() != null && !s3SourceConfig.getBucketOwners().isEmpty()) {
return new MappedBucketOwnerProvider(s3SourceConfig.getBucketOwners(), staticBucketOwnerProvider);
} else {
return staticBucketOwnerProvider;
}
}

private StaticBucketOwnerProvider getStaticBucketOwnerProvider(S3SourceConfig s3SourceConfig) {
final String accountId;
if(Objects.nonNull(s3SourceConfig.getSqsOptions()))

if(s3SourceConfig.getDefaultBucketOwner() != null)
accountId = s3SourceConfig.getDefaultBucketOwner();
else if(s3SourceConfig.getSqsOptions() != null)
accountId = extractQueueAccountId(s3SourceConfig);
else
accountId = extractStsRoleArnAccountId(s3SourceConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.opensearch.dataprepper.plugins.source.ownership;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/**
* Implements {@link BucketOwnerProvider} using a mapping of bucket
* names to account Ids for the bucket owners. Uses a delegate
* {@link BucketOwnerProvider} as a fallback when the bucket is not
* found in the map.
*/
class MappedBucketOwnerProvider implements BucketOwnerProvider {
private final Map<String, String> bucketOwnershipMap;
private final BucketOwnerProvider fallbackProvider;

MappedBucketOwnerProvider(Map<String, String> bucketOwnershipMap, BucketOwnerProvider fallbackProvider) {
this.bucketOwnershipMap = new HashMap<>(Objects.requireNonNull(bucketOwnershipMap));
this.fallbackProvider = Objects.requireNonNull(fallbackProvider);
}

@Override
public Optional<String> getBucketOwner(String bucket) {
String account = bucketOwnershipMap.get(bucket);
if(account != null) {
return Optional.of(account);
}
return fallbackProvider.getBucketOwner(bucket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,40 @@

package org.opensearch.dataprepper.plugins.source.ownership;

import org.opensearch.dataprepper.plugins.source.S3SourceConfig;
import org.opensearch.dataprepper.plugins.source.SqsQueueUrl;
import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.plugins.source.S3SourceConfig;
import org.opensearch.dataprepper.plugins.source.configuration.AwsAuthenticationOptions;
import org.opensearch.dataprepper.plugins.source.configuration.SqsOptions;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ConfigBucketOwnerProviderFactoryTest {

@Mock
private S3SourceConfig s3SourceConfig;
private String accountId;

@BeforeEach
void setUp() {
accountId = RandomStringUtils.randomNumeric(12);
}

private ConfigBucketOwnerProviderFactory createObjectUnderTest() {
return new ConfigBucketOwnerProviderFactory();
Expand All @@ -45,24 +54,145 @@ void createBucketOwnerProvider_returns_NoOwnershipBucketOwnerProvider_when_disab
}

@Test
void createBucketOwnerProvider_returns_ownership_based_on_SQS_queueUrl() {
final SqsOptions sqsOptions = mock(SqsOptions.class);
final String accountId = UUID.randomUUID().toString();
final String sqsUrl = UUID.randomUUID().toString();
when(sqsOptions.getSqsUrl()).thenReturn(sqsUrl);
when(s3SourceConfig.getSqsOptions()).thenReturn(sqsOptions);
void createBucketOwnerProvider_returns_ownership_using_default_when_no_bucket_mapping() {
when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);
when(s3SourceConfig.getDefaultBucketOwner()).thenReturn(accountId);

BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final String bucket = UUID.randomUUID().toString();
final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(bucket);

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}

@Test
void createBucketOwnerProvider_returns_ownership_using_default_when_bucket_mapping_does_not_match() {
when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);
when(s3SourceConfig.getBucketOwners()).thenReturn(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
when(s3SourceConfig.getDefaultBucketOwner()).thenReturn(accountId);

BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final String bucket = UUID.randomUUID().toString();
final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(bucket);

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}

@Nested
class WithSqsQueueUrl {

@BeforeEach
void setUp() {
final SqsOptions sqsOptions = mock(SqsOptions.class);
final String sqsUrl = String.format("https://sqs.us-east-1.amazonaws.com/%s/MyQueue", accountId);
lenient().when(sqsOptions.getSqsUrl()).thenReturn(sqsUrl);
lenient().when(s3SourceConfig.getSqsOptions()).thenReturn(sqsOptions);
lenient().when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);
}

@Test
void createBucketOwnerProvider_returns_MappedBucketOwnerProvider_when_bucketOwners_defined() {
when(s3SourceConfig.getBucketOwners()).thenReturn(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()));

final BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, instanceOf(MappedBucketOwnerProvider.class));
}

@Test
void createBucketOwnerProvider_returns_ownership_based_on_bucket_owners_map() {
final String bucket = UUID.randomUUID().toString();
when(s3SourceConfig.getBucketOwners()).thenReturn(Map.of(bucket, accountId));

BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(bucket);

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}

@Test
void createBucketOwnerProvider_returns_ownership_using_SQS_queue_URL_when_bucket_not_in_bucket_map() {
when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);
when(s3SourceConfig.getBucketOwners()).thenReturn(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()));

BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final String bucket = UUID.randomUUID().toString();
final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(bucket);

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}

@Test
void createBucketOwnerProvider_returns_StaticBucketOwnerProvider_when_bucketOwners_not_defined() {
final BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, instanceOf(StaticBucketOwnerProvider.class));
}

final SqsQueueUrl sqsQueueUrl = mock(SqsQueueUrl.class);
when(sqsQueueUrl.getAccountId()).thenReturn(accountId);

final BucketOwnerProvider bucketOwnerProvider;
try (final MockedStatic<SqsQueueUrl> sqsQueueUrlMockedStatic = mockStatic(SqsQueueUrl.class)) {
sqsQueueUrlMockedStatic.when(() -> SqsQueueUrl.parse(sqsUrl))
.thenReturn(sqsQueueUrl);
bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);
@Test
void createBucketOwnerProvider_returns_ownership_based_on_SQS_queueUrl() {
final BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(UUID.randomUUID().toString());

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}

@Test
void createBucketOwnerProvider_returns_ownership_using_default_when_bucket_mapping_does_not_match() {
accountId = RandomStringUtils.randomNumeric(12);
when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);
when(s3SourceConfig.getBucketOwners()).thenReturn(Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
when(s3SourceConfig.getDefaultBucketOwner()).thenReturn(accountId);

BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

final String bucket = UUID.randomUUID().toString();
final Optional<String> optionalOwner = bucketOwnerProvider.getBucketOwner(bucket);

assertThat(optionalOwner, notNullValue());
assertThat(optionalOwner.isPresent(), equalTo(true));
assertThat(optionalOwner.get(), equalTo(accountId));
}
}

@Test
void createBucketOwnerProvider_returns_ownership_based_on_STS_role_ARN_when_no_SQS_queue() {
final String stsRoleArn = String.format("arn:aws:iam::%s:role/something", accountId);
AwsAuthenticationOptions awsAuthenticationOptions = mock(AwsAuthenticationOptions.class);
when(awsAuthenticationOptions.getAwsStsRoleArn()).thenReturn(stsRoleArn);
when(s3SourceConfig.getAwsAuthenticationOptions()).thenReturn(awsAuthenticationOptions);

when(s3SourceConfig.isDisableBucketOwnershipValidation()).thenReturn(false);

final BucketOwnerProvider bucketOwnerProvider = createObjectUnderTest().createBucketOwnerProvider(s3SourceConfig);

assertThat(bucketOwnerProvider, notNullValue());

Expand Down
Loading

0 comments on commit a19d71d

Please sign in to comment.