Skip to content

Commit

Permalink
Make s3 partition size configurable and add unit test for S3 partitio…
Browse files Browse the repository at this point in the history
…n creator classes (#4437)

* Make s3 partition size configurable and add unit test for S3 partition creator classes

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

* Rename export partition size to export batch size

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

---------

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored Apr 18, 2024
1 parent 2859023 commit a17dbe5
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
public class CollectionConfig {
private static final String COLLECTION_SPLITTER = "\\.";
private static final int DEFAULT_STREAM_BATCH_SIZE = 1000;
private static final int DEFAULT_PARTITION_COUNT = 100;
private static final int DEFAULT_EXPORT_BATCH_SIZE = 10_000;
@JsonProperty("collection")
private @NotNull String collection;

@JsonProperty("export_config")
private ExportConfig exportConfig;

@JsonProperty("export")
private boolean export;

Expand All @@ -30,14 +29,20 @@ public class CollectionConfig {
@JsonProperty("s3_region")
private String s3Region;

@JsonProperty("partition_count")
private int partitionCount;

@JsonProperty("export_batch_size")
private int exportBatchSize;
@JsonProperty("stream_batch_size")
private int streamBatchSize;

public CollectionConfig() {
this.export = true;
this.stream = true;
this.exportConfig = new ExportConfig();
this.streamBatchSize = DEFAULT_STREAM_BATCH_SIZE;
this.partitionCount = DEFAULT_PARTITION_COUNT;
this.exportBatchSize = DEFAULT_EXPORT_BATCH_SIZE;
}

public String getCollection() {
Expand Down Expand Up @@ -68,28 +73,18 @@ public String getS3PathPrefix() {
return this.s3PathPrefix;
}

public int getPartitionCount() {
return this.partitionCount;
}

public int getExportBatchSize() {
return this.exportBatchSize;
}

public int getStreamBatchSize() {
return this.streamBatchSize;
}
public String getS3Region() {
return this.s3Region;
}

public ExportConfig getExportConfig() {
return this.exportConfig;
}

public static class ExportConfig {
private static final int DEFAULT_ITEMS_PER_PARTITION = 4000;
@JsonProperty("items_per_partition")
private Integer itemsPerPartition;

public ExportConfig() {
this.itemsPerPartition = DEFAULT_ITEMS_PER_PARTITION;
}

public Integer getItemsPerPartition() {
return this.itemsPerPartition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,34 @@
import java.util.Optional;

/**
* A S3 partition represents an S3 partition job to create S3 path prefix/sub folder that will
* A S3 Folder partition represents an S3 partition job to create S3 path prefix/sub folder that will
* be used to group records based on record key.
*/
public class S3FolderPartition extends EnhancedSourcePartition<String> {

public static final String PARTITION_TYPE = "S3_FOLDER";
private final String bucketName;
private final String subFolder;
private final String pathPrefix;
private final String region;
private final String collection;
private final int partitionCount;

public S3FolderPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
String[] keySplits = sourcePartitionStoreItem.getSourcePartitionKey().split("\\|");
collection = keySplits[0];
bucketName = keySplits[1];
subFolder = keySplits[2];
region = keySplits[3];
pathPrefix = keySplits[2];
partitionCount = Integer.parseInt(keySplits[3]);
region = keySplits[4];
}

public S3FolderPartition(final String bucketName, final String subFolder, final String region, final String collection) {
public S3FolderPartition(final String bucketName, final String pathPrefix, final String region, final String collection, final int partitionCount) {
this.bucketName = bucketName;
this.subFolder = subFolder;
this.pathPrefix = pathPrefix;
this.region = region;
this.collection = collection;
this.partitionCount = partitionCount;
}

@Override
Expand All @@ -45,7 +48,7 @@ public String getPartitionType() {

@Override
public String getPartitionKey() {
return collection + "|" + bucketName + "|" + subFolder + "|" + region;
return collection + "|" + bucketName + "|" + pathPrefix + "|" + partitionCount + "|" + region;
}

@Override
Expand All @@ -58,8 +61,8 @@ public String getBucketName() {
return bucketName;
}

public String getSubFolder() {
return subFolder;
public String getPathPrefix() {
return pathPrefix;
}

public String getRegion() {
Expand All @@ -69,4 +72,8 @@ public String getRegion() {
public String getCollection() {
return collection;
}

public int getPartitionCount() {
return partitionCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void init() {
private void createS3Partition(final CollectionConfig collectionConfig) {
LOG.info("Creating s3 folder global partition: {}", collectionConfig.getCollection());
coordinator.createPartition(new S3FolderPartition(collectionConfig.getS3Bucket(), collectionConfig.getS3PathPrefix(),
collectionConfig.getS3Region(), collectionConfig.getCollection()));
collectionConfig.getS3Region(), collectionConfig.getCollection(), collectionConfig.getPartitionCount()));
}

/**
Expand Down Expand Up @@ -171,7 +171,7 @@ private void createExportPartition(final CollectionConfig collectionConfig, fina
exportProgressState.setDatabaseName(collectionConfig.getDatabaseName());
exportProgressState.setExportTime(exportTime.toString()); // information purpose
final ExportPartition exportPartition = new ExportPartition(collectionConfig.getCollection(),
collectionConfig.getExportConfig().getItemsPerPartition(), exportTime, exportProgressState);
collectionConfig.getExportBatchSize(), exportTime, exportProgressState);
coordinator.createPartition(exportPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

public class S3PartitionCreator {
private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreator.class);
private final int partitionSize;
private final int partitionCount;

S3PartitionCreator(final int partitionSize) {
this.partitionSize = partitionSize;
S3PartitionCreator(final int partitionCount) {
this.partitionCount = partitionCount;
}

List<String> createPartition() {
final List<String> partitions = new ArrayList<>();
for (int i = 0; i < partitionSize; i++) {
for (int i = 0; i < partitionCount; i++) {
String partitionName = String.format("%02x", i) + "/";
partitions.add(partitionName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public class S3PartitionCreatorScheduler extends S3FolderPartitionCoordinator im
private static final Logger LOG = LoggerFactory.getLogger(S3PartitionCreatorScheduler.class);
public static final String S3_FOLDER_PREFIX = "S3-FOLDER-";
private static final int DEFAULT_TAKE_LEASE_INTERVAL_MILLIS = 60_000;
private static final int DEFAULT_S3_PARTITION_SIZE = 50;
private final EnhancedSourceCoordinator sourceCoordinator;
private final List<String> collections;
public S3PartitionCreatorScheduler(final EnhancedSourceCoordinator sourceCoordinator,
Expand All @@ -33,7 +32,7 @@ public void run() {
final Optional<EnhancedSourcePartition> sourcePartition = sourceCoordinator.acquireAvailablePartition(S3FolderPartition.PARTITION_TYPE);
if (sourcePartition.isPresent()) {
final S3FolderPartition s3FolderPartition = (S3FolderPartition) sourcePartition.get();
final List<String> s3Folders = createS3BucketPartitions();
final List<String> s3Folders = createS3BucketPartitions(s3FolderPartition.getPartitionCount());
sourceCoordinator.completePartition(s3FolderPartition);
final S3PartitionStatus s3PartitionStatus = new S3PartitionStatus(s3Folders);
sourceCoordinator.createPartition(new GlobalState(S3_FOLDER_PREFIX + s3FolderPartition.getCollection(), s3PartitionStatus.toMap()));
Expand Down Expand Up @@ -72,8 +71,8 @@ public void run() {
LOG.warn("S3 partition creator scheduler interrupted, looks like shutdown has triggered");
}

private List<String> createS3BucketPartitions() {
final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(DEFAULT_S3_PARTITION_SIZE);
private List<String> createS3BucketPartitions(int partitionCount) {
final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(partitionCount);
return s3PartitionCreator.createPartition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ public class LeaderSchedulerTest {
@Mock
private CollectionConfig collectionConfig;

@Mock
private CollectionConfig.ExportConfig exportConfig;

private LeaderScheduler leaderScheduler;
private LeaderPartition leaderPartition;

Expand All @@ -65,8 +62,7 @@ void test_should_init() {
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
given(collectionConfig.isExport()).willReturn(true);
given(collectionConfig.isStream()).willReturn(true);
given(collectionConfig.getExportConfig()).willReturn(exportConfig);
given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt());
given(collectionConfig.getExportBatchSize()).willReturn(Math.abs(new Random().nextInt()));
given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString());

final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -98,8 +94,7 @@ void test_should_init_export() {
leaderPartition = new LeaderPartition();
given(coordinator.acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)).willReturn(Optional.of(leaderPartition));
given(collectionConfig.isExport()).willReturn(true);
given(collectionConfig.getExportConfig()).willReturn(exportConfig);
given(exportConfig.getItemsPerPartition()).willReturn(new Random().nextInt());
given(collectionConfig.getExportBatchSize()).willReturn(Math.abs(new Random().nextInt()));
given(collectionConfig.getCollection()).willReturn(UUID.randomUUID().toString());

final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.opensearch.dataprepper.plugins.mongo.s3partition;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.model.S3PartitionStatus;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class S3FolderPartitionCoordinatorTest {
@Mock
private EnhancedSourceCoordinator sourceCoordinator;

@InjectMocks
private S3FolderPartitionCoordinator s3FolderPartitionCoordinator;

@Test
public void getGlobalS3FolderCreationStatus_empty() {
final String collection = UUID.randomUUID().toString();
when(sourceCoordinator.getPartition(S3PartitionCreatorScheduler.S3_FOLDER_PREFIX + collection)).thenReturn(Optional.empty());
Optional<S3PartitionStatus> partitionStatus = s3FolderPartitionCoordinator.getGlobalS3FolderCreationStatus(collection);
assertThat(partitionStatus.isEmpty(), is(true));
}

@Test
public void getGlobalS3FolderCreationStatus_nonEmpty() {
final String collection = UUID.randomUUID().toString();
final List<String> partitions = List.of(UUID.randomUUID().toString(), UUID.randomUUID().toString());
final GlobalState globalState = mock(GlobalState.class);
final Map<String, Object> props = Map.of("partitions", partitions);
when(globalState.getProgressState()).thenReturn(Optional.of(props));
when(sourceCoordinator.getPartition(S3PartitionCreatorScheduler.S3_FOLDER_PREFIX + collection)).thenReturn(Optional.of(globalState));
Optional<S3PartitionStatus> partitionStatus = s3FolderPartitionCoordinator.getGlobalS3FolderCreationStatus(collection);
assertThat(partitionStatus.isEmpty(), is(false));
assertThat(partitionStatus.get().getPartitions(), is(partitions));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.dataprepper.plugins.mongo.s3partition;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.S3FolderPartition;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.opensearch.dataprepper.plugins.mongo.s3partition.S3PartitionCreatorScheduler.S3_FOLDER_PREFIX;

@ExtendWith(MockitoExtension.class)
public class S3PartitionCreatorSchedulerTest {
@Mock
private EnhancedSourceCoordinator coordinator;
private S3PartitionCreatorScheduler s3PartitionCreatorScheduler;

@BeforeEach
public void setup() {
s3PartitionCreatorScheduler = new S3PartitionCreatorScheduler(coordinator, List.of(UUID.randomUUID().toString()));
}

@Test
void test_S3FolderPartition_empty() {
given(coordinator.acquireAvailablePartition(S3FolderPartition.PARTITION_TYPE)).willReturn(Optional.empty());
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> s3PartitionCreatorScheduler.run());
await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> verify(coordinator, never()).completePartition(any(EnhancedSourcePartition.class)));
await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> verify(coordinator, never()).createPartition(any(EnhancedSourcePartition.class)));
executorService.shutdownNow();
}

@Test
void test_S3FolderPartition_exist() {
final S3FolderPartition s3FolderPartition = mock(S3FolderPartition.class);
given(s3FolderPartition.getPartitionCount()).willReturn(Math.abs(new Random().nextInt(100)));
given(s3FolderPartition.getCollection()).willReturn(UUID.randomUUID().toString());
given(coordinator.acquireAvailablePartition(S3FolderPartition.PARTITION_TYPE)).willReturn(Optional.of(s3FolderPartition));
s3PartitionCreatorScheduler.run();
verify(coordinator).completePartition(s3FolderPartition);
final ArgumentCaptor<GlobalState> argumentCaptor = ArgumentCaptor.forClass(GlobalState.class);
verify(coordinator).createPartition(argumentCaptor.capture());
final GlobalState globalState = argumentCaptor.getValue();
assertThat(globalState.getPartitionKey(), is(S3_FOLDER_PREFIX + s3FolderPartition.getCollection()));
assertThat(globalState.getProgressState().get(), hasKey("partitions"));
final List<String> partitions = (List<String>) globalState.getProgressState().get().get("partitions");
assertThat(partitions, hasSize(s3FolderPartition.getPartitionCount()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package org.opensearch.dataprepper.plugins.mongo.s3partition;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Random;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;

public class S3PartitionCreatorTest {

@Test
public void createPartitionTest() {
final int partitionCount = Math.abs(new Random().nextInt(1000));
final S3PartitionCreator s3PartitionCreator = new S3PartitionCreator(partitionCount);
final List<String> partitions = s3PartitionCreator.createPartition();
assertThat(partitions, hasSize(partitionCount));
}
}

0 comments on commit a17dbe5

Please sign in to comment.