-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Add position deletes metadata table #6365
Core: Add position deletes metadata table #6365
Conversation
f1ff895
to
a99a078
Compare
@@ -64,9 +64,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) { | |||
*/ | |||
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) { | |||
PartitionSpec.Builder identitySpecBuilder = | |||
PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false); | |||
PartitionSpec.builderFor(metadataTableSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change, predicate pushdown would make the PositionDeletes scan tasks have wrong partition field id, spec id, so they will not work in the DeleteFile read.
Though it only happens in corner cases like dropped partition fields (where the auto-generated field-ids are not correct anymore). Added a test for this in TestMetadataTableScansWithPartitionEvolution
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
@@ -107,8 +111,15 @@ private MetadataColumns() {} | |||
ROW_POSITION.fieldId(), | |||
IS_DELETED.fieldId(), | |||
SPEC_ID.fieldId(), | |||
PARTITION_COLUMN_ID); | |||
PARTITION_COLUMN_ID, | |||
POSITION_DELETE_TABLE_PARTITION_FIELD_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a Position_Delete version of Spec_ID and File_Path, shouldn't we be able to use the original metadata columns for these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea its true, we could do that. I think I was going back and forth whether we wanted to have that or not, as these are 'main' columns with a proper name, versus hidden columns (whose name start with _), in other words they are not the exact same column. Im open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS
. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.
Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed from list of metadata columns. It was from an earlier change where I was trying to re-use existing Spark RowReader and it checked that projected column must either be part of schema, or metadata column.
Kept the field ids in this file to reserve them, to avoid conflict with "row" struct of this table, which has the data table schema.
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
@@ -234,6 +234,22 @@ public void testSplitPlanningWithOffsetsUnableToSplit() { | |||
"We should still only get 2 tasks per file", 32, Iterables.size(scan.planTasks())); | |||
} | |||
|
|||
@Test | |||
public void testBasicSplitPlanningDeleteFiles() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we store split offsets of Delete files? If so should we be checking the splitting on those boundaries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not but could be wrong, I dont see it being set in DeleteFile builder: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/FileMetadata.java#L38
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum, I guess we have decided they will be smaller than data files so I guess it doesn't matter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made: #6659
core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is really close, I have a few remaining questions but just minor issues
I'll have time to take a look tomorrow too. |
@@ -107,8 +111,15 @@ private MetadataColumns() {} | |||
ROW_POSITION.fieldId(), | |||
IS_DELETED.fieldId(), | |||
SPEC_ID.fieldId(), | |||
PARTITION_COLUMN_ID); | |||
PARTITION_COLUMN_ID, | |||
POSITION_DELETE_TABLE_PARTITION_FIELD_ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS
. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.
Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.
|
||
public static class PositionDeletesTableScan | ||
extends AbstractTableScan< | ||
BatchScan, org.apache.iceberg.ScanTask, ScanTaskGroup<org.apache.iceberg.ScanTask>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we give ScanTask
in this class a more specific name and then avoid this qualified import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
this.table = table; | ||
} | ||
|
||
protected Table table() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this, as I inherit now from BaseMetadataTable.
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
f2b5ff1
to
fb6faab
Compare
PartitionSpec spec = transformedSpecs.get(entry.file().specId()); | ||
String specString = PartitionSpecParser.toJson(spec); | ||
return new PositionDeleteScanTask( | ||
entry.file().copy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I spent some time on this. But if we iterate through the tasks without making a copy here, it corrupts the older tasks already iterated through (setting them all their file values to the latest tasks's!). See TestMetadataTableScans::testPositionDeletesUnpartitioned and run without this copy.
It looks like it's because the underlying AvroIterable reuses containers. So here I add copy() to avoid this problem. Maybe I can use copyWithoutStats()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be the correct thing to do here, if you see ManifestEntries.java that's how it works because you need the full list of tasks not just an iterator of them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, you need copyWithoutStats here since you aren't using the metrics past this point. Just to save a bit of memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think your other option is to just not return a parallel itterable and rely on callers to know they need to copy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, yea it was something in the ParallelIterable that messed it up. Anyway I changed to copyWithoutStats, it'll make it easier for the caller. Definitely stumped on this for half a day, glad I added the extra test though and saw this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another round. I'll need to check transformSpec
with fresh eyes. Getting close, though. Thanks, @szehon-ho!
@@ -94,6 +94,10 @@ private MetadataColumns() {} | |||
Types.LongType.get(), | |||
"Commit snapshot ID"); | |||
|
|||
public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 107; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the table schema will include these 3 columns in addition to columns in delete files. It is not bad to reserve some IDs but have we thought about keeping the table schema limited to the content of delete files and supporting already existing _spec_id
, _partition
, _file
metadata columns? Values for metadata columns will be only projected on demand, just like we can do that for regular tables.
It seems cleaner to me and shouldn't be hard to do since we will have a dedicated reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking one this a little bit, one problem is, this table is partitioned by "partition", and if I remember correctly there's some complication in partitioning by a hidden metadata column. So ended up making all of them actual columns..
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
m -> { | ||
// Filter partitions | ||
CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries = | ||
ManifestFiles.readDeleteManifest(m, tableOps().io(), transformedSpecs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to pass a projection while reading delete manifests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you mean, with/without stats? Fixed. Took me a little bit to figure it out, I needed to add a DELETE_SCAN_COLUMNS here that has content, versus the base scan's SCAN_COLUMNS which does not. As I have a filter on position_delete file content.
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
protected static final List<String> DELETE_SCAN_COLUMNS = | ||
ImmutableList.of( | ||
"snapshot_id", | ||
"content", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has to differ from data file's SCAN_COLUMNS by including content, which is used in the scan to filter later.
aeabee3
to
e053f26
Compare
Rebased and squash commits |
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
62e620f
to
e2c03aa
Compare
3623d15
to
c326236
Compare
b84e94c
to
93d07ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
core/src/main/java/org/apache/iceberg/BasePositionDeletesScanTask.java
Outdated
Show resolved
Hide resolved
@@ -30,12 +30,11 @@ public class MetadataColumns { | |||
private MetadataColumns() {} | |||
|
|||
// IDs Integer.MAX_VALUE - (1-100) are used for metadata columns | |||
public static final int FILE_PATH_COLUMN_ID = Integer.MAX_VALUE - 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to this solution
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
Types.NestedField.optional( | ||
MetadataColumns.DELETE_FILE_ROW_FIELD_ID, | ||
"row", | ||
table().schema().asStruct(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we add support to engines, we will have to test schema evolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, good point, will need to check this.
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
29bd430
to
b4e641a
Compare
|
||
List<ScanTask> tasks = Lists.newArrayList(scan.planFiles()); | ||
|
||
Assert.assertEquals( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added new tests for ScanMetrics. These cover number of manifests skipped and read. Somejhow the existing ManifestReader code we invoke does not update metrics about number of deleteFiles skipped and read (at least in the code path we use): https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L228
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made: #6658
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great work, @szehon-ho! I left some optional comments. Feel free to merge whenever you are ready.
core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
Outdated
Show resolved
Hide resolved
@@ -234,6 +234,22 @@ public void testSplitPlanningWithOffsetsUnableToSplit() { | |||
"We should still only get 2 tasks per file", 32, Iterables.size(scan.planTasks())); | |||
} | |||
|
|||
@Test | |||
public void testBasicSplitPlanningDeleteFiles() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Outdated
Show resolved
Hide resolved
ebf59ba
to
767474a
Compare
Thanks, @szehon-ho! Thanks for reviewing, @RussellSpitzer! |
This breaks up the pr #4812 , and is just the part to add the table PositionDeletesTable.
It is now based on @aokolnychyi 's newly-added BatchScan interface (#5922), added for this purpose so the scan is free to not return FileScanTask. It returns a custom ScanTask that scan DeleteFiles rather than DataFiles.