Skip to content

Commit

Permalink
Core: Pushdown data_file.content filter in entries metadata table (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
dramaticlly authored Jun 24, 2024
1 parent 29fd2a0 commit 5733aec
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 16 deletions.
185 changes: 183 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.BoundReference;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionVisitors;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
Expand Down Expand Up @@ -68,6 +72,7 @@ static CloseableIterable<FileScanTask> planFiles(
Expression rowFilter = context.rowFilter();
boolean caseSensitive = context.caseSensitive();
boolean ignoreResiduals = context.ignoreResiduals();
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;

LoadingCache<Integer, ManifestEvaluator> evalCache =
Caffeine.newBuilder()
Expand All @@ -77,14 +82,18 @@ static CloseableIterable<FileScanTask> planFiles(
PartitionSpec transformedSpec = BaseFilesTable.transformSpec(tableSchema, spec);
return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
});
ManifestContentEvaluator manifestContentEvaluator =
new ManifestContentEvaluator(filter, tableSchema.asStruct(), caseSensitive);

CloseableIterable<ManifestFile> filteredManifests =
CloseableIterable.filter(
manifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
manifests,
manifest ->
evalCache.get(manifest.partitionSpecId()).eval(manifest)
&& manifestContentEvaluator.eval(manifest));

String schemaString = SchemaParser.toJson(projectedSchema);
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(
Expand All @@ -94,6 +103,178 @@ static CloseableIterable<FileScanTask> planFiles(
table, manifest, projectedSchema, schemaString, specString, residuals));
}

/**
* Evaluates an {@link Expression} on a {@link ManifestFile} to test whether a given data or
* delete manifests shall be included in the scan
*/
private static class ManifestContentEvaluator {

private final Expression boundExpr;

private ManifestContentEvaluator(
Expression expr, Types.StructType structType, boolean caseSensitive) {
Expression rewritten = Expressions.rewriteNot(expr);
this.boundExpr = Binder.bind(structType, rewritten, caseSensitive);
}

private boolean eval(ManifestFile manifest) {
return new ManifestEvalVisitor().eval(manifest);
}

private class ManifestEvalVisitor extends ExpressionVisitors.BoundExpressionVisitor<Boolean> {

private int manifestContentId;

private static final boolean ROWS_MIGHT_MATCH = true;
private static final boolean ROWS_CANNOT_MATCH = false;

private boolean eval(ManifestFile manifestFile) {
this.manifestContentId = manifestFile.content().id();
return ExpressionVisitors.visitEvaluator(boundExpr, this);
}

@Override
public Boolean alwaysTrue() {
return ROWS_MIGHT_MATCH;
}

@Override
public Boolean alwaysFalse() {
return ROWS_CANNOT_MATCH;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be null
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNull(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean isNaN(BoundReference<T> ref) {
if (fileContent(ref)) {
return ROWS_CANNOT_MATCH; // date_file.content should not be nan
} else {
return ROWS_MIGHT_MATCH;
}
}

@Override
public <T> Boolean notNaN(BoundReference<T> ref) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
if (fileContent(ref)) {
Literal<Integer> intLit = lit.to(Types.IntegerType.get());
if (!contentMatch(intLit.value())) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
if (fileContent(ref)) {
Literal<Integer> intLit = lit.to(Types.IntegerType.get());
if (contentMatch(intLit.value())) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean in(BoundReference<T> ref, Set<T> literalSet) {
if (fileContent(ref)) {
if (literalSet.stream().noneMatch(lit -> contentMatch((Integer) lit))) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notIn(BoundReference<T> ref, Set<T> literalSet) {
if (fileContent(ref)) {
if (literalSet.stream().anyMatch(lit -> contentMatch((Integer) lit))) {
return ROWS_CANNOT_MATCH;
}
}
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean startsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

@Override
public <T> Boolean notStartsWith(BoundReference<T> ref, Literal<T> lit) {
return ROWS_MIGHT_MATCH;
}

private <T> boolean fileContent(BoundReference<T> ref) {
return ref.fieldId() == DataFile.CONTENT.fieldId();
}

private <T> boolean contentMatch(Integer fileContentId) {
if (FileContent.DATA.id() == fileContentId) {
return ManifestContent.DATA.id() == manifestContentId;
} else if (FileContent.EQUALITY_DELETES.id() == fileContentId
|| FileContent.POSITION_DELETES.id() == fileContentId) {
return ManifestContent.DELETES.id() == manifestContentId;
} else {
return false;
}
}
}
}

static class ManifestReadTask extends BaseFileScanTask implements DataTask {
private final Schema projection;
private final Schema fileProjection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ protected static List<Object> parameters() {
return Arrays.asList(1, 2);
}

protected Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
.map(t -> (AllManifestsTable.ManifestListReadTask) t)
protected Set<String> scannedPaths(TableScan scan) {
return StreamSupport.stream(scan.planFiles().spliterator(), false)
.map(t -> t.file().path().toString())
.collect(Collectors.toSet());
}
Expand Down
Loading

0 comments on commit 5733aec

Please sign in to comment.