Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d963a48
6471 Set up test for simultaneous processes splitting partition tree
patchwork01 Jan 27, 2026
cca51a2
6471: Update BulkImportJobDriver to retry partition splits
Rob9786 Jan 27, 2026
e4e54ee
6471 Extract PartitionPreSplitter
patchwork01 Jan 27, 2026
48bd211
6471: Added test to check nothings happened
Rob9786 Jan 27, 2026
f2fde56
6471 Move assertion data to assertion
patchwork01 Jan 27, 2026
3e014ed
6471 Tidy tests
patchwork01 Jan 27, 2026
83e0863
6471 Test when not enough data is present
patchwork01 Jan 27, 2026
b405016
6471 Add test stub to limit number of retries
patchwork01 Jan 27, 2026
0f97207
6471 Test for retrying partition splitting
patchwork01 Jan 27, 2026
db366a0
6471: Removed test checking if partitions split by another process
Rob9786 Jan 27, 2026
7a62b9b
6471: Added new bulk import table property
Rob9786 Jan 27, 2026
984550c
6471 Adjust property description
patchwork01 Jan 27, 2026
9bf5d82
6471: Added loop count
Rob9786 Jan 29, 2026
44e9fdb
6471: Added retry limit test
Rob9786 Jan 29, 2026
7817ca1
Merge branch 'develop' into 6471-extend-partition-tree-conflict
Rob9786 Jan 30, 2026
4ea2d76
6471: Update to call to FixedStateStoreProvider
Rob9786 Jan 30, 2026
38a084c
6471: Updated comments after review
Rob9786 Feb 2, 2026
461d380
6471: Updates to partitionsRepresentation
Rob9786 Feb 2, 2026
7994621
6471: Add disabled flag to test
Rob9786 Feb 3, 2026
dd431c3
Merge remote-tracking branch 'origin/develop' into 6471-extend-partit…
patchwork01 Feb 4, 2026
fac976e
6471 Fix test extending tree from other process
patchwork01 Feb 4, 2026
f3bc758
6471 Tidy partitions representation code
patchwork01 Feb 4, 2026
e58986e
6471 Test only retrying once
patchwork01 Feb 4, 2026
c388b02
6471 Refactor faking other process extending partition tree
patchwork01 Feb 4, 2026
080a604
6471 Refactor tests further
patchwork01 Feb 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The following instance properties relate to default values used by table propert
| sleeper.default.table.statestore.transactionlog.delete.behind.snapshot.min.age.minutes | The minimum age in minutes of a snapshot in order to allow deletion of transactions leading up to it. When deleting old transactions, there's a chance that processes may still read transactions starting from an older snapshot. We need to avoid deletion of any transactions associated with a snapshot that may still be used as the starting point for reading the log. | 2 | false |
| sleeper.default.table.statestore.transactionlog.delete.number.behind.latest.snapshot | The minimum number of transactions that a transaction must be behind the latest snapshot before being deleted. This is the number of transactions that will be kept and protected from deletion, whenever old transactions are deleted. This includes the transaction that the latest snapshot was created against. Any transactions after the snapshot will never be deleted as they are still in active use.<br>This should be configured in relation to the property which determines whether a process will load the latest snapshot or instead seek through the transaction log, since we need to preserve transactions that may still be read:<br>sleeper.default.statestore.snapshot.load.min.transactions.ahead<br>The snapshot that will be considered the latest snapshot is configured by a property to set the minimum age for it to count for this:<br>sleeper.default.statestore.transactionlog.delete.behind.snapshot.min.age<br> | 200 | false |
| sleeper.default.table.bulk.import.min.leaf.partitions | Specifies the minimum number of leaf partitions that are needed to run a bulk import job. If this minimum has not been reached, bulk import jobs will refuse to start. | 256 | false |
| sleeper.default.table.bulk.import.partition.splitting.attempts | Specifies the number of times bulk import tries to create leaf partitions to meet the minimum number of leaf partitions. This will be retried if another process splits the same partitions at the same time. | 3 | false |
| sleeper.default.table.ingest.batcher.job.min.size | Specifies the minimum total file size required for an ingest job to be batched and sent. An ingest job will be created if the batcher runs while this much data is waiting, and the minimum number of files is also met. | 1G | false |
| sleeper.default.table.ingest.batcher.job.max.size | Specifies the maximum total file size for a job in the ingest batcher. If more data is waiting than this, it will be split into multiple jobs. If a single file exceeds this, it will still be ingested in its own job. It's also possible some data may be left for a future run of the batcher if some recent files overflow the size of a job but aren't enough to create a job on their own. | 5G | false |
| sleeper.default.table.ingest.batcher.job.min.files | Specifies the minimum number of files for a job in the ingest batcher. An ingest job will be created if the batcher runs while this many files are waiting, and the minimum size of files is also met. | 1 | false |
Expand Down
1 change: 1 addition & 0 deletions docs/usage/properties/table/bulk_import.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ The following table properties relate to bulk import, i.e. ingesting data using
| sleeper.table.bulk.import.emr.executor.max.capacity | (Non-persistent EMR mode only) The maximum number of capacity units to provision as EC2 instances for executors in the EMR cluster.<br>This is measured in instance fleet capacity units. These are declared alongside the requested instance types, as each type will count for a certain number of units. By default the units are the number of instances.<br>This value overrides the default value in the instance properties. It can be overridden by a value in the bulk import job specification. | 10 |
| sleeper.table.bulk.import.emr.release.label | (Non-persistent EMR mode only) The EMR release label to be used when creating an EMR cluster for bulk importing data using Spark running on EMR.<br>This value overrides the default value in the instance properties. It can be overridden by a value in the bulk import job specification. | emr-7.12.0 |
| sleeper.table.bulk.import.min.leaf.partitions | Specifies the minimum number of leaf partitions that are needed to run a bulk import job. If this minimum has not been reached, bulk import jobs will refuse to start | 256 |
| sleeper.table.bulk.import.partition.splitting.attempts | Specifies the number of times bulk import tries to create leaf partitions to meet the minimum number of leaf partitions. This will be retried if another process splits the same partitions at the same time. | 3 |
| sleeper.table.bulk.import.job.files.commit.async | If true, bulk import will add files via requests sent to the state store committer lambda asynchronously. If false, bulk import will commit new files at the end of the job synchronously.<br>This is only applied if async commits are enabled for the table. The default value is set in an instance property. | true |
6 changes: 6 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -1915,6 +1915,12 @@ sleeper.logging.root.level=INFO
# (default value shown below, uncomment to set a value)
# sleeper.default.table.bulk.import.min.leaf.partitions=256

# Specifies the number of times bulk import tries to create leaf partitions to meet the minimum number
# of leaf partitions. This will be retried if another process splits the same partitions at the same
# time.
# (default value shown below, uncomment to set a value)
# sleeper.default.table.bulk.import.partition.splitting.attempts=3

# Specifies the minimum total file size required for an ingest job to be batched and sent. An ingest
# job will be created if the batcher runs while this much data is waiting, and the minimum number of
# files is also met.
Expand Down
6 changes: 6 additions & 0 deletions example/full/table.properties
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,12 @@ sleeper.table.statestore.classname=DynamoDBTransactionLogStateStore
# (default value shown below, uncomment to set a value)
# sleeper.table.bulk.import.min.leaf.partitions=256

# Specifies the number of times bulk import tries to create leaf partitions to meet the minimum number
# of leaf partitions. This will be retried if another process splits the same partitions at the same
# time.
# (default value shown below, uncomment to set a value)
# sleeper.table.bulk.import.partition.splitting.attempts=3

# If true, bulk import will add files via requests sent to the state store committer lambda
# asynchronously. If false, bulk import will commit new files at the end of the job synchronously.
# This is only applied if async commits are enabled for the table. The default value is set in an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.core.partition.Partition;
import sleeper.core.partition.PartitionTree;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
Expand All @@ -53,7 +52,6 @@
import sleeper.core.util.LoggedDuration;
import sleeper.ingest.tracker.job.IngestJobTrackerFactory;
import sleeper.sketches.Sketches;
import sleeper.splitter.core.extend.ExtendPartitionTreeBasedOnSketches;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.commit.SqsFifoStateStoreCommitRequestSender;

Expand All @@ -70,7 +68,6 @@
import java.util.function.Supplier;

import static sleeper.core.properties.table.TableProperty.BULK_IMPORT_FILES_COMMIT_ASYNC;
import static sleeper.core.properties.table.TableProperty.BULK_IMPORT_MIN_LEAF_PARTITION_COUNT;

/**
* Executes a Spark job that reads input Parquet files and writes to a Sleeper table. This takes a
Expand All @@ -83,14 +80,13 @@ public class BulkImportJobDriver<C extends BulkImportContext<C>> {
private static final Logger LOGGER = LoggerFactory.getLogger(BulkImportJobDriver.class);

private final ContextCreator<C> contextCreator;
private final DataSketcher<C> dataSketcher;
private final PartitionPreSplitter<C> preSplitter;
private final BulkImporter<C> bulkImporter;
private final TablePropertiesProvider tablePropertiesProvider;
private final StateStoreProvider stateStoreProvider;
private final IngestJobTracker tracker;
private final StateStoreCommitRequestSender asyncSender;
private final Supplier<Instant> getTime;
private final Supplier<String> partitionIdSupplier;

public BulkImportJobDriver(
ContextCreator<C> contextCreator,
Expand All @@ -103,14 +99,13 @@ public BulkImportJobDriver(
Supplier<Instant> getTime,
Supplier<String> partitionIdSupplier) {
this.contextCreator = contextCreator;
this.dataSketcher = dataSketcher;
this.preSplitter = new PartitionPreSplitter<>(dataSketcher, stateStoreProvider, partitionIdSupplier);
this.bulkImporter = bulkImporter;
this.tablePropertiesProvider = tablePropertiesProvider;
this.stateStoreProvider = stateStoreProvider;
this.tracker = tracker;
this.asyncSender = asyncSender;
this.getTime = getTime;
this.partitionIdSupplier = partitionIdSupplier;
}

public void run(BulkImportJob job, String jobRunId, String taskId) throws IOException {
Expand All @@ -129,7 +124,7 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
// Note that we stop the Spark context after we've applied the changes in Sleeper.
try (C context = contextCreator.createContext(tableProperties, allPartitions, job)) {

C contextAfterSplit = preSplitPartitionsIfNecessary(tableProperties, allPartitions, context);
C contextAfterSplit = preSplitter.preSplitPartitionsIfNecessary(tableProperties, allPartitions, context);

Instant startTime = getTime.get();
tracker.jobStarted(IngestJobStartedEvent.builder()
Expand All @@ -153,24 +148,6 @@ public void run(BulkImportJob job, String jobRunId, String taskId) throws IOExce
}
}

private C preSplitPartitionsIfNecessary(TableProperties tableProperties, List<Partition> allPartitions, C context) {
PartitionTree tree = new PartitionTree(allPartitions);
List<Partition> leafPartitions = tree.getLeafPartitions();
int minLeafPartitions = tableProperties.getInt(BULK_IMPORT_MIN_LEAF_PARTITION_COUNT);
if (leafPartitions.size() < minLeafPartitions) {
LOGGER.info("Extending partition tree from {} leaf partitions to {}", leafPartitions.size(), minLeafPartitions);
Map<String, Sketches> partitionIdToSketches = dataSketcher.generatePartitionIdToSketches(context);
StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);
ExtendPartitionTreeBasedOnSketches.forBulkImport(tableProperties, partitionIdSupplier)
.createTransaction(tree, partitionIdToSketches)
.synchronousCommit(stateStore);
return context.withPartitions(stateStore.getAllPartitions());
} else {
LOGGER.info("Partition tree meets minimum of {} leaf partitions", minLeafPartitions);
return context;
}
}

private void commitSuccessfulJob(TableProperties tableProperties, IngestJobRunIds runIds, Instant startTime, List<FileReference> fileReferences) {

Instant finishTime = getTime.get();
Expand Down
Loading
Loading