Skip to content

Commit

Permalink
Core: Store split offset for delete files (apache#7011)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhpk234 authored Mar 24, 2023
1 parent e394d86 commit faf974e
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 17 deletions.
13 changes: 4 additions & 9 deletions core/src/main/java/org/apache/iceberg/DataFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.util.ByteBuffers;

Expand Down Expand Up @@ -178,7 +178,8 @@ public Builder copy(DataFile toCopy) {
this.upperBounds = toCopy.upperBounds();
this.keyMetadata =
toCopy.keyMetadata() == null ? null : ByteBuffers.copy(toCopy.keyMetadata());
this.splitOffsets = toCopy.splitOffsets() == null ? null : copyList(toCopy.splitOffsets());
this.splitOffsets =
toCopy.splitOffsets() == null ? null : ImmutableList.copyOf(toCopy.splitOffsets());
this.sortOrderId = toCopy.sortOrderId();
return this;
}
Expand Down Expand Up @@ -261,7 +262,7 @@ public Builder withMetrics(Metrics metrics) {

public Builder withSplitOffsets(List<Long> offsets) {
if (offsets != null) {
this.splitOffsets = copyList(offsets);
this.splitOffsets = ImmutableList.copyOf(offsets);
} else {
this.splitOffsets = null;
}
Expand Down Expand Up @@ -312,10 +313,4 @@ public DataFile build() {
sortOrderId);
}
}

private static <E> List<E> copyList(List<E> toCopy) {
List<E> copy = Lists.newArrayListWithExpectedSize(toCopy.size());
copy.addAll(toCopy);
return copy;
}
}
13 changes: 13 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FileStatus;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.ByteBuffers;

public class FileMetadata {
Expand Down Expand Up @@ -56,6 +58,7 @@ public static class Builder {
private Map<Integer, ByteBuffer> upperBounds = null;
private ByteBuffer keyMetadata = null;
private Integer sortOrderId = null;
private List<Long> splitOffsets = null;

Builder(PartitionSpec spec) {
this.spec = spec;
Expand Down Expand Up @@ -192,6 +195,15 @@ public Builder withMetrics(Metrics metrics) {
return this;
}

public Builder withSplitOffsets(List<Long> offsets) {
if (offsets != null) {
this.splitOffsets = ImmutableList.copyOf(offsets);
} else {
this.splitOffsets = null;
}
return this;
}

public Builder withEncryptionKeyMetadata(ByteBuffer newKeyMetadata) {
this.keyMetadata = newKeyMetadata;
return this;
Expand Down Expand Up @@ -249,6 +261,7 @@ public DeleteFile build() {
upperBounds),
equalityFieldIds,
sortOrderId,
splitOffsets,
keyMetadata);
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand All @@ -40,6 +41,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
Metrics metrics,
int[] equalityFieldIds,
Integer sortOrderId,
List<Long> splitOffsets,
ByteBuffer keyMetadata) {
super(
specId,
Expand All @@ -55,7 +57,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
metrics.nanValueCounts(),
metrics.lowerBounds(),
metrics.upperBounds(),
null,
splitOffsets,
equalityFieldIds,
sortOrderId,
keyMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void close() throws IOException {
.withEncryptionKeyMetadata(keyMetadata)
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.withSortOrder(sortOrder)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public void close() throws IOException {
.withPath(location)
.withPartition(partition)
.withEncryptionKeyMetadata(keyMetadata)
.withSplitOffsets(appender.splitOffsets())
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class TestManifestWriterVersions {
METRICS,
EQUALITY_ID_ARR,
SORT_ORDER_ID,
null,
null);

@Rule public TemporaryFolder temp = new TemporaryFolder();
Expand Down
60 changes: 56 additions & 4 deletions core/src/test/java/org/apache/iceberg/TestSplitPlanning.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,43 @@ public void testBasicSplitPlanningDeleteFiles() {
Assert.assertEquals(8, Iterables.size(posDeletesTable.newBatchScan().planTasks()));
}

@Test
public void testBasicSplitPlanningDeleteFilesWithSplitOffsets() {
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
List<DeleteFile> files128Mb = newDeleteFiles(4, 128 * 1024 * 1024, 8);
appendDeleteFiles(files128Mb);

PositionDeletesTable posDeletesTable = new PositionDeletesTable(table);

try (CloseableIterable<ScanTaskGroup<ScanTask>> groups =
posDeletesTable
.newBatchScan()
.option(TableProperties.SPLIT_SIZE, String.valueOf(64L * 1024 * 1024))
.planTasks()) {
int totalTaskGroups = 0;
for (ScanTaskGroup<ScanTask> group : groups) {
int tasksPerGroup = 0;
long previousOffset = -1;
for (ScanTask task : group.tasks()) {
tasksPerGroup++;
Assert.assertTrue(task instanceof SplitPositionDeletesScanTask);
SplitPositionDeletesScanTask splitPosDelTask = (SplitPositionDeletesScanTask) task;
if (previousOffset != -1) {
Assert.assertEquals(splitPosDelTask.start(), previousOffset);
}
previousOffset = splitPosDelTask.start() + splitPosDelTask.length();
}

Assert.assertEquals("Should have 1 task as result of task merge", 1, tasksPerGroup);
totalTaskGroups++;
}
// we expect 8 bins since split size is 64MB
Assert.assertEquals(8, totalTaskGroups);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private void appendFiles(Iterable<DataFile> files) {
AppendFiles appendFiles = table.newAppend();
files.forEach(appendFiles::appendFile);
Expand Down Expand Up @@ -304,18 +341,23 @@ private void appendDeleteFiles(List<DeleteFile> files) {
}

private List<DeleteFile> newDeleteFiles(int numFiles, long sizeInBytes) {
return newDeleteFiles(numFiles, sizeInBytes, FileFormat.PARQUET);
return newDeleteFiles(numFiles, sizeInBytes, FileFormat.PARQUET, 1);
}

private List<DeleteFile> newDeleteFiles(int numFiles, long sizeInBytes, long numOffsets) {
return newDeleteFiles(numFiles, sizeInBytes, FileFormat.PARQUET, numOffsets);
}

private List<DeleteFile> newDeleteFiles(int numFiles, long sizeInBytes, FileFormat fileFormat) {
private List<DeleteFile> newDeleteFiles(
int numFiles, long sizeInBytes, FileFormat fileFormat, long numOffsets) {
List<DeleteFile> files = Lists.newArrayList();
for (int fileNum = 0; fileNum < numFiles; fileNum++) {
files.add(newDeleteFile(sizeInBytes, fileFormat));
files.add(newDeleteFile(sizeInBytes, fileFormat, numOffsets));
}
return files;
}

private DeleteFile newDeleteFile(long sizeInBytes, FileFormat fileFormat) {
private DeleteFile newDeleteFile(long sizeInBytes, FileFormat fileFormat, long numOffsets) {
String fileName = UUID.randomUUID().toString();
FileMetadata.Builder builder =
FileMetadata.deleteFileBuilder(PartitionSpec.unpartitioned())
Expand All @@ -324,6 +366,16 @@ private DeleteFile newDeleteFile(long sizeInBytes, FileFormat fileFormat) {
.withFileSizeInBytes(sizeInBytes)
.withRecordCount(2);

if (numOffsets > 1) {
long stepSize = sizeInBytes / numOffsets;
List<Long> offsets =
LongStream.range(0, numOffsets)
.map(i -> i * stepSize)
.boxed()
.collect(Collectors.toList());
builder.withSplitOffsets(offsets);
}

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,17 @@ public void testSplitTasks() throws IOException {

Table deleteTable =
MetadataTableUtils.createMetadataTableInstance(tab, MetadataTableType.POSITION_DELETES);
Assert.assertTrue(
"Position delete scan should produce more than one split",
Iterables.size(deleteTable.newBatchScan().planTasks()) > 1);

if (format.equals(FileFormat.AVRO)) {
Assert.assertTrue(
"Position delete scan should produce more than one split",
Iterables.size(deleteTable.newBatchScan().planTasks()) > 1);
} else {
Assert.assertEquals(
"Position delete scan should produce one split",
1,
Iterables.size(deleteTable.newBatchScan().planTasks()));
}

StructLikeSet actual = actual(tableName, tab);
StructLikeSet expected = expected(tab, deletes, null, posDeletes.path().toString());
Expand Down

0 comments on commit faf974e

Please sign in to comment.