Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add position deletes metadata table #6365

Merged
merged 1 commit into from
Jan 24, 2023

Conversation

szehon-ho
Copy link
Collaborator

@szehon-ho szehon-ho commented Dec 6, 2022

This breaks up the pr #4812 , and is just the part to add the table PositionDeletesTable.

It is now based on @aokolnychyi 's newly-added BatchScan interface (#5922), added for this purpose so the scan is free to not return FileScanTask. It returns a custom ScanTask that scan DeleteFiles rather than DataFiles.

@@ -64,9 +64,12 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
*/
static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder =
PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
PartitionSpec.builderFor(metadataTableSchema)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, predicate pushdown would make the PositionDeletes scan tasks have wrong partition field id, spec id, so they will not work in the DeleteFile read.

Though it only happens in corner cases like dropped partition fields (where the auto-generated field-ids are not correct anymore). Added a test for this in TestMetadataTableScansWithPartitionEvolution

@@ -107,8 +111,15 @@ private MetadataColumns() {}
ROW_POSITION.fieldId(),
IS_DELETED.fieldId(),
SPEC_ID.fieldId(),
PARTITION_COLUMN_ID);
PARTITION_COLUMN_ID,
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a Position_Delete version of Spec_ID and File_Path, shouldn't we be able to use the original metadata columns for these?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea its true, we could do that. I think I was going back and forth whether we wanted to have that or not, as these are 'main' columns with a proper name, versus hidden columns (whose name start with _), in other words they are not the exact same column. Im open

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.

Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed from list of metadata columns. It was from an earlier change where I was trying to re-use existing Spark RowReader and it checked that projected column must either be part of schema, or metadata column.

Kept the field ids in this file to reserve them, to avoid conflict with "row" struct of this table, which has the data table schema.

@@ -234,6 +234,22 @@ public void testSplitPlanningWithOffsetsUnableToSplit() {
"We should still only get 2 tasks per file", 32, Iterables.size(scan.planTasks()));
}

@Test
public void testBasicSplitPlanningDeleteFiles() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we store split offsets of Delete files? If so should we be checking the splitting on those boundaries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not but could be wrong, I dont see it being set in DeleteFile builder: https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/FileMetadata.java#L38

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, I guess we have decided they will be smaller than data files so I guess it doesn't matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made: #6659

Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is really close, I have a few remaining questions but just minor issues

@aokolnychyi
Copy link
Contributor

I'll have time to take a look tomorrow too.

core/src/main/java/org/apache/iceberg/BaseTableScan.java Outdated Show resolved Hide resolved
@@ -107,8 +111,15 @@ private MetadataColumns() {}
ROW_POSITION.fieldId(),
IS_DELETED.fieldId(),
SPEC_ID.fieldId(),
PARTITION_COLUMN_ID);
PARTITION_COLUMN_ID,
POSITION_DELETE_TABLE_PARTITION_FIELD_ID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even have to make them metadata columns then? I thought they would be just regular columns in a table. I don't think they should be added to META_COLUMNS. I think metadata columns should be only about columns we can project on demand. That's why we did not add changelog columns here.

Let me also think about reserving field IDs for them. It is similar yet different use case compared to changelog columns as there is no changelog table as such.


public static class PositionDeletesTableScan
extends AbstractTableScan<
BatchScan, org.apache.iceberg.ScanTask, ScanTaskGroup<org.apache.iceberg.ScanTask>>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give ScanTask in this class a more specific name and then avoid this qualified import?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

this.table = table;
}

protected Table table() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this, as I inherit now from BaseMetadataTable.

PartitionSpec spec = transformedSpecs.get(entry.file().specId());
String specString = PartitionSpecParser.toJson(spec);
return new PositionDeleteScanTask(
entry.file().copy(),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time on this. But if we iterate through the tasks without making a copy here, it corrupts the older tasks already iterated through (setting them all their file values to the latest tasks's!). See TestMetadataTableScans::testPositionDeletesUnpartitioned and run without this copy.

It looks like it's because the underlying AvroIterable reuses containers. So here I add copy() to avoid this problem. Maybe I can use copyWithoutStats()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be the correct thing to do here, if you see ManifestEntries.java that's how it works because you need the full list of tasks not just an iterator of them

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, you need copyWithoutStats here since you aren't using the metrics past this point. Just to save a bit of memory

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your other option is to just not return a parallel itterable and rely on callers to know they need to copy

Copy link
Collaborator Author

@szehon-ho szehon-ho Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yea it was something in the ParallelIterable that messed it up. Anyway I changed to copyWithoutStats, it'll make it easier for the caller. Definitely stumped on this for half a day, glad I added the extra test though and saw this.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did another round. I'll need to check transformSpec with fresh eyes. Getting close, though. Thanks, @szehon-ho!

core/src/main/java/org/apache/iceberg/MetadataColumns.java Outdated Show resolved Hide resolved
@@ -94,6 +94,10 @@ private MetadataColumns() {}
Types.LongType.get(),
"Commit snapshot ID");

public static final int POSITION_DELETE_TABLE_PARTITION_FIELD_ID = Integer.MAX_VALUE - 107;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, the table schema will include these 3 columns in addition to columns in delete files. It is not bad to reserve some IDs but have we thought about keeping the table schema limited to the content of delete files and supporting already existing _spec_id, _partition, _file metadata columns? Values for metadata columns will be only projected on demand, just like we can do that for regular tables.

It seems cleaner to me and shouldn't be hard to do since we will have a dedicated reader.

Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking one this a little bit, one problem is, this table is partitioned by "partition", and if I remember correctly there's some complication in partitioning by a hidden metadata column. So ended up making all of them actual columns..

m -> {
// Filter partitions
CloseableIterable<ManifestEntry<DeleteFile>> deleteFileEntries =
ManifestFiles.readDeleteManifest(m, tableOps().io(), transformedSpecs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass a projection while reading delete manifests?

Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you mean, with/without stats? Fixed. Took me a little bit to figure it out, I needed to add a DELETE_SCAN_COLUMNS here that has content, versus the base scan's SCAN_COLUMNS which does not. As I have a filter on position_delete file content.

protected static final List<String> DELETE_SCAN_COLUMNS =
ImmutableList.of(
"snapshot_id",
"content",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to differ from data file's SCAN_COLUMNS by including content, which is used in the scan to filter later.

@szehon-ho
Copy link
Collaborator Author

Rebased and squash commits

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost there!

@@ -30,12 +30,11 @@ public class MetadataColumns {
private MetadataColumns() {}

// IDs Integer.MAX_VALUE - (1-100) are used for metadata columns
public static final int FILE_PATH_COLUMN_ID = Integer.MAX_VALUE - 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to this solution

Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
"row",
table().schema().asStruct(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we add support to engines, we will have to test schema evolution.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, good point, will need to check this.

core/src/main/java/org/apache/iceberg/SnapshotScan.java Outdated Show resolved Hide resolved

List<ScanTask> tasks = Lists.newArrayList(scan.planFiles());

Assert.assertEquals(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new tests for ScanMetrics. These cover number of manifests skipped and read. Somejhow the existing ManifestReader code we invoke does not update metrics about number of deleteFiles skipped and read (at least in the code path we use): https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestReader.java#L228

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made: #6658

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great work, @szehon-ho! I left some optional comments. Feel free to merge whenever you are ready.

@@ -234,6 +234,22 @@ public void testSplitPlanningWithOffsetsUnableToSplit() {
"We should still only get 2 tasks per file", 32, Iterables.size(scan.planTasks()));
}

@Test
public void testBasicSplitPlanningDeleteFiles() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why we wouldn't store that. I think it was overlooked.
@szehon-ho, can we do that in a follow-up PR? Not urgent, at least we should create an issue.

@aokolnychyi aokolnychyi merged commit 3c27c57 into apache:master Jan 24, 2023
@aokolnychyi
Copy link
Contributor

Thanks, @szehon-ho! Thanks for reviewing, @RussellSpitzer!

@szehon-ho
Copy link
Collaborator Author

Thanks, created #6658 and #6659 to track existing issues found here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants