Skip to content

Round targetPartitionSizeInBytes to a multiple of minTargetPartitionSizeInBytes #17834

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class ArbitraryDistributionSplitAssigner
private int nextPartitionId;
private int adaptiveCounter;
private long targetPartitionSizeInBytes;
private long roundedTargetPartitionSizeInBytes;
private final List<PartitionAssignment> allAssignments = new ArrayList<>();
private final Map<Optional<HostAddress>, PartitionAssignment> openAssignments = new HashMap<>();

Expand Down Expand Up @@ -94,6 +95,7 @@ class ArbitraryDistributionSplitAssigner
this.maxTaskSplitCount = maxTaskSplitCount;

this.targetPartitionSizeInBytes = minTargetPartitionSizeInBytes;
this.roundedTargetPartitionSizeInBytes = minTargetPartitionSizeInBytes;
}

@Override
Expand Down Expand Up @@ -200,7 +202,7 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
Optional<HostAddress> hostRequirement = getHostRequirement(split);
PartitionAssignment partitionAssignment = openAssignments.get(hostRequirement);
long splitSizeInBytes = getSplitSizeInBytes(split);
if (partitionAssignment != null && ((partitionAssignment.getAssignedDataSizeInBytes() + splitSizeInBytes > targetPartitionSizeInBytes)
if (partitionAssignment != null && ((partitionAssignment.getAssignedDataSizeInBytes() + splitSizeInBytes > roundedTargetPartitionSizeInBytes)
|| (partitionAssignment.getAssignedSplitCount() + 1 > maxTaskSplitCount))) {
partitionAssignment.setFull(true);
for (PlanNodeId partitionedSourceNodeId : partitionedSources) {
Expand All @@ -221,7 +223,8 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List<Spl
if (adaptiveCounter >= adaptiveGrowthPeriod) {
targetPartitionSizeInBytes = (long) min(maxTargetPartitionSizeInBytes, ceil(targetPartitionSizeInBytes * adaptiveGrowthFactor));
// round to a multiple of minTargetPartitionSizeInBytes so work will be evenly distributed among drivers of a task
targetPartitionSizeInBytes = (targetPartitionSizeInBytes + minTargetPartitionSizeInBytes - 1) / minTargetPartitionSizeInBytes * minTargetPartitionSizeInBytes;
roundedTargetPartitionSizeInBytes = round(targetPartitionSizeInBytes * 1.0 / minTargetPartitionSizeInBytes) * minTargetPartitionSizeInBytes;
verify(roundedTargetPartitionSizeInBytes > 0, "roundedTargetPartitionSizeInBytes %s not positive", roundedTargetPartitionSizeInBytes);
adaptiveCounter = 0;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,86 @@ public void testAdaptiveTaskSizing()
.build());
}

@Test
public void testAdaptiveTaskSizingRounding()
{
Set<PlanNodeId> partitionedSources = ImmutableSet.of(PARTITIONED_1);
List<SplitBatch> batches = ImmutableList.of(
new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(1), createSplit(2), createSplit(3)), false),
new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(4), createSplit(5), createSplit(6)), false),
new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(7), createSplit(8), createSplit(9)), true));
SplitAssigner splitAssigner = new ArbitraryDistributionSplitAssigner(
Optional.of(TEST_CATALOG_HANDLE),
partitionedSources,
ImmutableSet.of(),
1,
1.3,
100,
400,
100,
5);
SplitAssignerTester tester = new SplitAssignerTester();
for (SplitBatch batch : batches) {
PlanNodeId planNodeId = batch.getPlanNodeId();
List<Split> splits = batch.getSplits();
boolean noMoreSplits = batch.isNoMoreSplits();
tester.update(splitAssigner.assign(planNodeId, createSplitsMultimap(splits), noMoreSplits));
tester.checkContainsSplits(planNodeId, splits, false);
}
tester.update(splitAssigner.finish());
List<TaskDescriptor> taskDescriptors = tester.getTaskDescriptors().orElseThrow();
assertThat(taskDescriptors).hasSize(5);

// target size 100, round to 100
TaskDescriptor taskDescriptor0 = taskDescriptors.get(0);
assertTaskDescriptor(
taskDescriptor0,
taskDescriptor0.getPartitionId(),
ImmutableListMultimap.<PlanNodeId, Split>builder()
.put(PARTITIONED_1, createSplit(1))
.build());

// target size 130, round to 100
TaskDescriptor taskDescriptor1 = taskDescriptors.get(1);
assertTaskDescriptor(
taskDescriptor1,
taskDescriptor1.getPartitionId(),
ImmutableListMultimap.<PlanNodeId, Split>builder()
.put(PARTITIONED_1, createSplit(2))
.build());

// target size 169, round to 200
TaskDescriptor taskDescriptor2 = taskDescriptors.get(2);
assertTaskDescriptor(
taskDescriptor2,
taskDescriptor2.getPartitionId(),
ImmutableListMultimap.<PlanNodeId, Split>builder()
.put(PARTITIONED_1, createSplit(3))
.put(PARTITIONED_1, createSplit(4))
.build());

// target size 220, round to 200
TaskDescriptor taskDescriptor3 = taskDescriptors.get(3);
assertTaskDescriptor(
taskDescriptor3,
taskDescriptor3.getPartitionId(),
ImmutableListMultimap.<PlanNodeId, Split>builder()
.put(PARTITIONED_1, createSplit(5))
.put(PARTITIONED_1, createSplit(6))
.build());

// target size 286, round to 300
TaskDescriptor taskDescriptor4 = taskDescriptors.get(4);
assertTaskDescriptor(
taskDescriptor4,
taskDescriptor4.getPartitionId(),
ImmutableListMultimap.<PlanNodeId, Split>builder()
.put(PARTITIONED_1, createSplit(7))
.put(PARTITIONED_1, createSplit(8))
.put(PARTITIONED_1, createSplit(9))
.build());
}

private void fuzzTesting(boolean withHostRequirements)
{
Set<PlanNodeId> partitionedSources = new HashSet<>();
Expand Down