Skip to content

Commit 311bd5e

Browse files
authored
[Spark] Remove sorts from delete from queries reading just one partition and already sorted data. (apache#45) (apache#49)
(cherry picked from commit 07cc6e4)
1 parent 2e125a5 commit 311bd5e

File tree

8 files changed

+456
-7
lines changed

8 files changed

+456
-7
lines changed

core/src/main/java/org/apache/iceberg/BaseScan.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
4747
"record_count",
4848
"partition",
4949
"key_metadata",
50-
"split_offsets");
50+
"split_offsets",
51+
"sort_order_id");
5152

5253
private static final List<String> STATS_COLUMNS =
5354
ImmutableList.of(

core/src/main/java/org/apache/iceberg/TableProperties.java

+3
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,9 @@ private TableProperties() {}
216216
public static final String SPLIT_OPEN_FILE_COST = "read.split.open-file-cost";
217217
public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB
218218

219+
public static final String FILE_AS_SPLIT = "read.split.file-as-split";
220+
public static final boolean FILE_AS_SPLIT_DEFAULT = false;
221+
219222
public static final String PARQUET_VECTORIZATION_ENABLED = "read.parquet.vectorization.enabled";
220223
public static final boolean PARQUET_VECTORIZATION_ENABLED_DEFAULT = true;
221224

spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestPinterestCopyOnWriteDelete.java

+377
Large diffs are not rendered by default.

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

+9
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ public long splitOpenFileCost() {
179179
.parse();
180180
}
181181

182+
public boolean fileAsSplit() {
183+
return confParser
184+
.booleanConf()
185+
.option(SparkReadOptions.FILE_AS_SPLIT)
186+
.tableProperty(TableProperties.FILE_AS_SPLIT)
187+
.defaultValue(TableProperties.FILE_AS_SPLIT_DEFAULT)
188+
.parse();
189+
}
190+
182191
/**
183192
* Enables reading a timestamp without time zone as a timestamp with time zone.
184193
*

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ private SparkReadOptions() {}
4747
// Overrides the table's read.split.planning-lookback
4848
public static final String LOOKBACK = "lookback";
4949

50+
// Use input file as one split
51+
public static final String FILE_AS_SPLIT = "file-as-split";
52+
5053
// Overrides the table's read.split.open-file-cost
5154
public static final String FILE_OPEN_COST = "file-open-cost";
5255

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Objects;
2525
import java.util.Set;
2626
import java.util.stream.Collectors;
27+
import org.apache.iceberg.BaseCombinedScanTask;
2728
import org.apache.iceberg.CombinedScanTask;
2829
import org.apache.iceberg.FileScanTask;
2930
import org.apache.iceberg.MetadataColumns;
@@ -51,6 +52,7 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
5152

5253
private final TableScan scan;
5354
private final Snapshot snapshot;
55+
private final SparkReadConf readConf;
5456

5557
// lazy variables
5658
private List<FileScanTask> files = null; // lazy cache of files
@@ -79,6 +81,7 @@ class SparkCopyOnWriteScan extends SparkScan implements SupportsRuntimeFiltering
7981

8082
this.scan = scan;
8183
this.snapshot = snapshot;
84+
this.readConf = readConf;
8285

8386
if (scan == null) {
8487
this.files = Collections.emptyList();
@@ -153,12 +156,19 @@ synchronized List<FileScanTask> files() {
153156
@Override
154157
protected synchronized List<CombinedScanTask> tasks() {
155158
if (tasks == null) {
156-
CloseableIterable<FileScanTask> splitFiles =
157-
TableScanUtil.splitFiles(
158-
CloseableIterable.withNoopClose(files()), scan.targetSplitSize());
159-
CloseableIterable<CombinedScanTask> scanTasks =
160-
TableScanUtil.planTasks(
161-
splitFiles, scan.targetSplitSize(), scan.splitLookback(), scan.splitOpenFileCost());
159+
CloseableIterable<CombinedScanTask> scanTasks;
160+
if (readConf.fileAsSplit()) {
161+
scanTasks =
162+
CloseableIterable.transform(
163+
CloseableIterable.withNoopClose(files()), BaseCombinedScanTask::new);
164+
} else {
165+
CloseableIterable<FileScanTask> splitFiles =
166+
TableScanUtil.splitFiles(
167+
CloseableIterable.withNoopClose(files()), scan.targetSplitSize());
168+
scanTasks =
169+
TableScanUtil.planTasks(
170+
splitFiles, scan.targetSplitSize(), scan.splitLookback(), scan.splitOpenFileCost());
171+
}
162172
tasks = Lists.newArrayList(scanTasks);
163173
}
164174

@@ -201,6 +211,10 @@ public String toString() {
201211
table(), expectedSchema().asStruct(), filterExpressions(), caseSensitive());
202212
}
203213

214+
public boolean fileAsSplit() {
215+
return readConf.fileAsSplit();
216+
}
217+
204218
private Long currentSnapshotId() {
205219
Snapshot currentSnapshot = table().currentSnapshot();
206220
return currentSnapshot != null ? currentSnapshot.snapshotId() : null;

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java

+1
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
634634
.dataFileFormat(format)
635635
.dataSchema(writeSchema)
636636
.dataSparkType(dsSchema)
637+
.dataSortOrder(table.sortOrder())
637638
.build();
638639

639640
if (spec.isUnpartitioned()) {

spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java

+41
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.spark.source;
2020

21+
import java.util.Arrays;
2122
import org.apache.iceberg.DistributionMode;
2223
import org.apache.iceberg.IsolationLevel;
2324
import org.apache.iceberg.PartitionSpec;
@@ -34,9 +35,11 @@
3435
import org.apache.iceberg.spark.SparkUtil;
3536
import org.apache.iceberg.spark.SparkWriteConf;
3637
import org.apache.iceberg.types.TypeUtil;
38+
import org.apache.iceberg.util.SortOrderUtil;
3739
import org.apache.spark.sql.SparkSession;
3840
import org.apache.spark.sql.connector.distributions.Distribution;
3941
import org.apache.spark.sql.connector.distributions.Distributions;
42+
import org.apache.spark.sql.connector.distributions.OrderedDistribution;
4043
import org.apache.spark.sql.connector.expressions.SortOrder;
4144
import org.apache.spark.sql.connector.iceberg.write.RowLevelOperation.Command;
4245
import org.apache.spark.sql.connector.read.Scan;
@@ -163,6 +166,22 @@ public Write build() {
163166
ordering = NO_ORDERING;
164167
}
165168

169+
// In case of CopyOnWrite operation with scan using file as split and OrderedDistribution
170+
// * skip ordering by partition, iff, all input data files are in same partition and has same
171+
// spec as current
172+
// table spec
173+
// * skip ordering by table sort order, iff, all input files are already sorted by table's
174+
// current sort order
175+
if (copyOnWriteScan != null
176+
&& copyOnWriteScan.fileAsSplit()
177+
&& distribution instanceof OrderedDistribution) {
178+
if (skipOrderingAndDistribution((OrderedDistribution) distribution)) {
179+
LOG.info(
180+
"Skipping distribution/ordering: input files are already in required distribution/ordering");
181+
ordering = NO_ORDERING;
182+
distribution = Distributions.unspecified();
183+
}
184+
}
166185
return new SparkWrite(
167186
spark, table, writeConf, writeInfo, appId, writeSchema, dsSchema, distribution, ordering) {
168187

@@ -265,4 +284,26 @@ private static Schema validateOrMergeWriteSchema(
265284

266285
return writeSchema;
267286
}
287+
288+
private boolean skipOrderingAndDistribution(OrderedDistribution distribution) {
289+
// check if all input files have same partitioning as current table partitioning
290+
if (!copyOnWriteScan.files().stream()
291+
.allMatch(x -> x.file().specId() == table.spec().specId())) {
292+
return false;
293+
}
294+
295+
// check if all input files are sorted on table's current sort order
296+
if (!copyOnWriteScan.files().stream()
297+
.allMatch(
298+
x ->
299+
x.file().sortOrderId() != null
300+
&& x.file().sortOrderId() == table.sortOrder().orderId())) {
301+
return false;
302+
}
303+
304+
// check if required ordering is same as table's default ordering
305+
return Arrays.equals(
306+
distribution.ordering(),
307+
SparkDistributionAndOrderingUtil.convert(SortOrderUtil.buildSortOrder(table)));
308+
}
268309
}

0 commit comments

Comments
 (0)