Skip to content

Commit

Permalink
Core, Spark: Include content offset/size in PositionDeletesTable (#11808
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nastra authored Jan 24, 2025
1 parent 2e2b728 commit 026a9b0
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 46 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ private MetadataColumns() {}
public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
public static final String PARTITION_COLUMN_NAME = "_partition";
public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";
public static final int CONTENT_OFFSET_COLUMN_ID = Integer.MAX_VALUE - 6;
public static final int CONTENT_SIZE_IN_BYTES_COLUMN_ID = Integer.MAX_VALUE - 7;

// IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
public static final NestedField DELETE_FILE_PATH =
Expand Down
73 changes: 49 additions & 24 deletions core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class PositionDeletesTable extends BaseMetadataTable {
public static final String PARTITION = "partition";
public static final String SPEC_ID = "spec_id";
public static final String DELETE_FILE_PATH = "delete_file_path";
public static final String CONTENT_OFFSET = "content_offset";
public static final String CONTENT_SIZE_IN_BYTES = "content_size_in_bytes";

private final Schema schema;
private final int defaultSpecId;
Expand Down Expand Up @@ -110,31 +112,54 @@ public Map<String, String> properties() {
}

private Schema calculateSchema() {
int formatVersion = TableUtil.formatVersion(table());
Types.StructType partitionType = Partitioning.partitionType(table());
List<Types.NestedField> columns =
ImmutableList.of(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC),
Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID,
PARTITION,
partitionType,
"Partition that position delete row belongs to"),
Types.NestedField.required(
MetadataColumns.SPEC_ID_COLUMN_ID,
SPEC_ID,
Types.IntegerType.get(),
MetadataColumns.SPEC_ID_COLUMN_DOC),
Types.NestedField.required(
MetadataColumns.FILE_PATH_COLUMN_ID,
DELETE_FILE_PATH,
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));
ImmutableList.Builder<Types.NestedField> builder =
ImmutableList.<Types.NestedField>builder()
.add(MetadataColumns.DELETE_FILE_PATH)
.add(MetadataColumns.DELETE_FILE_POS)
.add(
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC))
.add(
Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID,
PARTITION,
partitionType,
"Partition that position delete row belongs to"))
.add(
Types.NestedField.required(
MetadataColumns.SPEC_ID_COLUMN_ID,
SPEC_ID,
Types.IntegerType.get(),
MetadataColumns.SPEC_ID_COLUMN_DOC))
.add(
Types.NestedField.required(
MetadataColumns.FILE_PATH_COLUMN_ID,
DELETE_FILE_PATH,
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));

if (formatVersion >= 3) {
builder
.add(
Types.NestedField.optional(
MetadataColumns.CONTENT_OFFSET_COLUMN_ID,
CONTENT_OFFSET,
Types.LongType.get(),
"The offset in the DV where the content starts"))
.add(
Types.NestedField.optional(
MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID,
CONTENT_SIZE_IN_BYTES,
Types.LongType.get(),
"The length in bytes of the DV blob"));
}

List<Types.NestedField> columns = builder.build();

// Calculate used ids (for de-conflict)
Set<Integer> currentlyUsedIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,12 @@ public void testPositionDeletesManyColumns() {
table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit();

PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table);
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010);
int expectedIds =
formatVersion >= 3
? 2012 // partition col + 8 columns + 2003 ids inside the deleted row column
: 2010; // partition col + 6 columns + 2003 ids inside the deleted row column
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size())
.isEqualTo(expectedIds);

BatchScan scan = positionDeletesTable.newBatchScan();
assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -79,6 +80,10 @@ public InternalRow next() {
rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID));
} else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) {
rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID));
} else if (fieldId == MetadataColumns.CONTENT_OFFSET_COLUMN_ID) {
rowValues.add(deleteFile.contentOffset());
} else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) {
rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,17 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException
Table positionDeletesTable =
catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes"));

Schema projectedSchema =
positionDeletesTable
.schema()
.select(
MetadataColumns.DELETE_FILE_PATH.name(),
MetadataColumns.DELETE_FILE_POS.name(),
PositionDeletesTable.DELETE_FILE_PATH);
List<String> columns =
Lists.newArrayList(
MetadataColumns.DELETE_FILE_PATH.name(),
MetadataColumns.DELETE_FILE_POS.name(),
PositionDeletesTable.DELETE_FILE_PATH);
if (formatVersion >= 3) {
columns.add(PositionDeletesTable.CONTENT_OFFSET);
columns.add(PositionDeletesTable.CONTENT_SIZE_IN_BYTES);
}

Schema projectedSchema = positionDeletesTable.schema().select(columns);

List<ScanTask> scanTasks =
Lists.newArrayList(
Expand All @@ -187,15 +191,27 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException

String dataFileLocation =
formatVersion >= 3 ? deleteFile1.referencedDataFile() : dataFile1.location();
Object[] first = {
UTF8String.fromString(dataFileLocation), 0L, UTF8String.fromString(deleteFile1.location())
};
Object[] second = {
UTF8String.fromString(dataFileLocation), 1L, UTF8String.fromString(deleteFile1.location())
};
List<Object> first =
Lists.newArrayList(
UTF8String.fromString(dataFileLocation),
0L,
UTF8String.fromString(deleteFile1.location()));
List<Object> second =
Lists.newArrayList(
UTF8String.fromString(dataFileLocation),
1L,
UTF8String.fromString(deleteFile1.location()));

if (formatVersion >= 3) {
first.add(deleteFile1.contentOffset());
first.add(deleteFile1.contentSizeInBytes());
second.add(deleteFile1.contentOffset());
second.add(deleteFile1.contentSizeInBytes());
}

assertThat(internalRowsToJava(actualRows, projectedSchema))
.hasSize(2)
.containsExactly(first, second);
.containsExactly(first.toArray(), second.toArray());
}

assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class);
Expand All @@ -214,15 +230,27 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException

String dataFileLocation =
formatVersion >= 3 ? deleteFile2.referencedDataFile() : dataFile2.location();
Object[] first = {
UTF8String.fromString(dataFileLocation), 2L, UTF8String.fromString(deleteFile2.location())
};
Object[] second = {
UTF8String.fromString(dataFileLocation), 3L, UTF8String.fromString(deleteFile2.location())
};
List<Object> first =
Lists.newArrayList(
UTF8String.fromString(dataFileLocation),
2L,
UTF8String.fromString(deleteFile2.location()));
List<Object> second =
Lists.newArrayList(
UTF8String.fromString(dataFileLocation),
3L,
UTF8String.fromString(deleteFile2.location()));

if (formatVersion >= 3) {
first.add(deleteFile2.contentOffset());
first.add(deleteFile2.contentSizeInBytes());
second.add(deleteFile2.contentOffset());
second.add(deleteFile2.contentSizeInBytes());
}

assertThat(internalRowsToJava(actualRows, projectedSchema))
.hasSize(2)
.containsExactly(first, second);
.containsExactly(first.toArray(), second.toArray());
}
}

Expand Down

0 comments on commit 026a9b0

Please sign in to comment.