From 5733aecd0d010b17b9bd0f10aaccb601b1f13c90 Mon Sep 17 00:00:00 2001 From: Hongyue/Steve Zhang Date: Mon, 24 Jun 2024 10:39:26 -0700 Subject: [PATCH] Core: Pushdown data_file.content filter in entries metadata table (#10203) --- .../org/apache/iceberg/BaseEntriesTable.java | 185 +++++++++++++++++- .../iceberg/MetadataTableScanTestBase.java | 5 +- .../iceberg/TestMetadataTableScans.java | 148 ++++++++++++-- 3 files changed, 322 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java index 43d8a71f8706..4e485d516f12 100644 --- a/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseEntriesTable.java @@ -23,8 +23,12 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.iceberg.expressions.Binder; +import org.apache.iceberg.expressions.BoundReference; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionVisitors; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.ManifestEvaluator; import org.apache.iceberg.expressions.ResidualEvaluator; import org.apache.iceberg.io.CloseableIterable; @@ -68,6 +72,7 @@ static CloseableIterable planFiles( Expression rowFilter = context.rowFilter(); boolean caseSensitive = context.caseSensitive(); boolean ignoreResiduals = context.ignoreResiduals(); + Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; LoadingCache evalCache = Caffeine.newBuilder() @@ -77,14 +82,18 @@ static CloseableIterable planFiles( PartitionSpec transformedSpec = BaseFilesTable.transformSpec(tableSchema, spec); return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive); }); + ManifestContentEvaluator manifestContentEvaluator = + new ManifestContentEvaluator(filter, tableSchema.asStruct(), caseSensitive); CloseableIterable filteredManifests = CloseableIterable.filter( - manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest)); + manifests, + manifest -> + evalCache.get(manifest.partitionSpecId()).eval(manifest) + && manifestContentEvaluator.eval(manifest)); String schemaString = SchemaParser.toJson(projectedSchema); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); - Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter); return CloseableIterable.transform( @@ -94,6 +103,178 @@ static CloseableIterable planFiles( table, manifest, projectedSchema, schemaString, specString, residuals)); } + /** + * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether a given data or + * delete manifests shall be included in the scan + */ + private static class ManifestContentEvaluator { + + private final Expression boundExpr; + + private ManifestContentEvaluator( + Expression expr, Types.StructType structType, boolean caseSensitive) { + Expression rewritten = Expressions.rewriteNot(expr); + this.boundExpr = Binder.bind(structType, rewritten, caseSensitive); + } + + private boolean eval(ManifestFile manifest) { + return new ManifestEvalVisitor().eval(manifest); + } + + private class ManifestEvalVisitor extends ExpressionVisitors.BoundExpressionVisitor { + + private int manifestContentId; + + private static final boolean ROWS_MIGHT_MATCH = true; + private static final boolean ROWS_CANNOT_MATCH = false; + + private boolean eval(ManifestFile manifestFile) { + this.manifestContentId = manifestFile.content().id(); + return ExpressionVisitors.visitEvaluator(boundExpr, this); + } + + @Override + public Boolean alwaysTrue() { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean alwaysFalse() { + return ROWS_CANNOT_MATCH; + } + + @Override + public Boolean not(Boolean result) { + return !result; + } + + @Override + public Boolean and(Boolean leftResult, Boolean rightResult) { + return leftResult && rightResult; + } + + @Override + public Boolean or(Boolean leftResult, Boolean rightResult) { + return leftResult || rightResult; + } + + @Override + public Boolean isNull(BoundReference ref) { + if (fileContent(ref)) { + return ROWS_CANNOT_MATCH; // date_file.content should not be null + } else { + return ROWS_MIGHT_MATCH; + } + } + + @Override + public Boolean notNull(BoundReference ref) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean isNaN(BoundReference ref) { + if (fileContent(ref)) { + return ROWS_CANNOT_MATCH; // date_file.content should not be nan + } else { + return ROWS_MIGHT_MATCH; + } + } + + @Override + public Boolean notNaN(BoundReference ref) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean lt(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean ltEq(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean gt(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean gtEq(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean eq(BoundReference ref, Literal lit) { + if (fileContent(ref)) { + Literal intLit = lit.to(Types.IntegerType.get()); + if (!contentMatch(intLit.value())) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean notEq(BoundReference ref, Literal lit) { + if (fileContent(ref)) { + Literal intLit = lit.to(Types.IntegerType.get()); + if (contentMatch(intLit.value())) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean in(BoundReference ref, Set literalSet) { + if (fileContent(ref)) { + if (literalSet.stream().noneMatch(lit -> contentMatch((Integer) lit))) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean notIn(BoundReference ref, Set literalSet) { + if (fileContent(ref)) { + if (literalSet.stream().anyMatch(lit -> contentMatch((Integer) lit))) { + return ROWS_CANNOT_MATCH; + } + } + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean startsWith(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + @Override + public Boolean notStartsWith(BoundReference ref, Literal lit) { + return ROWS_MIGHT_MATCH; + } + + private boolean fileContent(BoundReference ref) { + return ref.fieldId() == DataFile.CONTENT.fieldId(); + } + + private boolean contentMatch(Integer fileContentId) { + if (FileContent.DATA.id() == fileContentId) { + return ManifestContent.DATA.id() == manifestContentId; + } else if (FileContent.EQUALITY_DELETES.id() == fileContentId + || FileContent.POSITION_DELETES.id() == fileContentId) { + return ManifestContent.DELETES.id() == manifestContentId; + } else { + return false; + } + } + } + } + static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final Schema projection; private final Schema fileProjection; diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java index 9c732e843c8b..a4e964b017ba 100644 --- a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java +++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java @@ -43,9 +43,8 @@ protected static List parameters() { return Arrays.asList(1, 2); } - protected Set actualManifestListPaths(TableScan allManifestsTableScan) { - return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false) - .map(t -> (AllManifestsTable.ManifestListReadTask) t) + protected Set scannedPaths(TableScan scan) { + return StreamSupport.stream(scan.planFiles().spliterator(), false) .map(t -> t.file().path().toString()) .collect(Collectors.toSet()); } diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index df314f6a802f..0a3040939c04 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -41,6 +41,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterators; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -227,6 +228,131 @@ public void testManifestEntriesTableWithDroppedPartition() throws IOException { } } + @TestTemplate + public void testEntriesTableDataFileContentEq() { + preparePartitionedTable(); + + Table entriesTable = new ManifestEntriesTable(table); + + Expression dataOnly = Expressions.equal("data_file.content", 0); + TableScan entriesTableScan = entriesTable.newScan().filter(dataOnly); + Set expected = + table.currentSnapshot().dataManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + + assertThat(scannedPaths(entriesTableScan)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expected); + + assertThat( + scannedPaths(entriesTable.newScan().filter(Expressions.equal("data_file.content", 3)))) + .as("Expected manifest filter by data file content does not match") + .isEmpty(); + } + + @TestTemplate + public void testEntriesTableDateFileContentNotEq() { + preparePartitionedTable(); + + Table entriesTable = new ManifestEntriesTable(table); + + Expression notData = Expressions.notEqual("data_file.content", 0); + TableScan entriesTableScan = entriesTable.newScan().filter(notData); + Set expected = + table.currentSnapshot().deleteManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + + assertThat(scannedPaths(entriesTableScan)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expected); + + Set allManifests = + table.currentSnapshot().allManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + assertThat( + scannedPaths( + entriesTable.newScan().filter(Expressions.notEqual("data_file.content", 3)))) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(allManifests); + } + + @TestTemplate + public void testEntriesTableDataFileContentIn() { + preparePartitionedTable(); + Table entriesTable = new ManifestEntriesTable(table); + + Expression in0 = Expressions.in("data_file.content", 0); + TableScan scan1 = entriesTable.newScan().filter(in0); + Set expectedDataManifestPath = + table.currentSnapshot().dataManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + assertThat(scannedPaths(scan1)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expectedDataManifestPath); + + Expression in12 = Expressions.in("data_file.content", 1, 2); + TableScan scan2 = entriesTable.newScan().filter(in12); + Set expectedDeleteManifestPath = + table.currentSnapshot().deleteManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + assertThat(scannedPaths(scan2)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expectedDeleteManifestPath); + + Expression inAll = Expressions.in("data_file.content", 0, 1, 2); + Set allManifests = Sets.union(expectedDataManifestPath, expectedDeleteManifestPath); + assertThat(scannedPaths(entriesTable.newScan().filter(inAll))) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(allManifests); + + Expression inNeither = Expressions.in("data_file.content", 3, 4); + assertThat(scannedPaths(entriesTable.newScan().filter(inNeither))) + .as("Expected manifest filter by data file content does not match") + .isEmpty(); + } + + @TestTemplate + public void testEntriesTableDataFileContentNotIn() { + preparePartitionedTable(); + Table entriesTable = new ManifestEntriesTable(table); + + Expression notIn0 = Expressions.notIn("data_file.content", 0); + TableScan scan1 = entriesTable.newScan().filter(notIn0); + Set expectedDeleteManifestPath = + table.currentSnapshot().deleteManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + assertThat(scannedPaths(scan1)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expectedDeleteManifestPath); + + Expression notIn12 = Expressions.notIn("data_file.content", 1, 2); + TableScan scan2 = entriesTable.newScan().filter(notIn12); + Set expectedDataManifestPath = + table.currentSnapshot().dataManifests(table.io()).stream() + .map(ManifestFile::path) + .collect(Collectors.toSet()); + assertThat(scannedPaths(scan2)) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(expectedDataManifestPath); + + Expression notInNeither = Expressions.notIn("data_file.content", 3); + Set allManifests = Sets.union(expectedDataManifestPath, expectedDeleteManifestPath); + assertThat(scannedPaths(entriesTable.newScan().filter(notInNeither))) + .as("Expected manifest filter by data file content does not match") + .isEqualTo(allManifests); + + Expression notInAll = Expressions.notIn("data_file.content", 0, 1, 2); + assertThat(scannedPaths(entriesTable.newScan().filter(notInAll))) + .as("Expected manifest filter by data file content does not match") + .isEmpty(); + } + @TestTemplate public void testAllDataFilesTableHonorsIgnoreResiduals() throws IOException { table.newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); @@ -1081,7 +1207,7 @@ public void testAllManifestsTableSnapshotGt() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.greaterThan("reference_snapshot_id", 2)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L)); } @@ -1095,7 +1221,7 @@ public void testAllManifestsTableSnapshotGte() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.greaterThanOrEqual("reference_snapshot_id", 3)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 3L, 4L)); } @@ -1109,7 +1235,7 @@ public void testAllManifestsTableSnapshotLt() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.lessThan("reference_snapshot_id", 3)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L)); } @@ -1123,7 +1249,7 @@ public void testAllManifestsTableSnapshotLte() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.lessThanOrEqual("reference_snapshot_id", 2)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 2L)); } @@ -1137,7 +1263,7 @@ public void testAllManifestsTableSnapshotEq() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.equal("reference_snapshot_id", 2)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L)); } @@ -1151,7 +1277,7 @@ public void testAllManifestsTableSnapshotNotEq() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.notEqual("reference_snapshot_id", 2)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L)); } @@ -1165,7 +1291,7 @@ public void testAllManifestsTableSnapshotIn() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.in("reference_snapshot_id", 1, 3)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L)); } @@ -1179,7 +1305,7 @@ public void testAllManifestsTableSnapshotNotIn() { TableScan manifestsTableScan = manifestsTable.newScan().filter(Expressions.notIn("reference_snapshot_id", 1, 3)); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L)); } @@ -1198,7 +1324,7 @@ public void testAllManifestsTableSnapshotAnd() { Expressions.and( Expressions.equal("reference_snapshot_id", 2), Expressions.greaterThan("length", 0))); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L)); } @@ -1217,7 +1343,7 @@ public void testAllManifestsTableSnapshotOr() { Expressions.or( Expressions.equal("reference_snapshot_id", 2), Expressions.equal("reference_snapshot_id", 4))); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 2L, 4L)); } @@ -1233,7 +1359,7 @@ public void testAllManifestsTableSnapshotNot() { .newScan() .filter(Expressions.not(Expressions.equal("reference_snapshot_id", 2))); - assertThat(actualManifestListPaths(manifestsTableScan)) + assertThat(scannedPaths(manifestsTableScan)) .as("Expected snapshots do not match") .isEqualTo(expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L)); }