Skip to content

Commit

Permalink
IMPALA-11289: Push-down compound predicates to iceberg
Browse files Browse the repository at this point in the history
This patch implements pushing compound predicates down to Iceberg. The
compound predicates include NOT, AND, and OR.

Testing:
 - Added end-to-end test

Change-Id: I27bc67b71033900c466183da5b1907ac90844177
Reviewed-on: http://gerrit.cloudera.org:8080/18535
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
  • Loading branch information
LPL authored and Impala Public Jenkins committed May 24, 2022
1 parent d00d346 commit b58966b
Show file tree
Hide file tree
Showing 4 changed files with 593 additions and 55 deletions.
131 changes: 104 additions & 27 deletions fe/src/main/java/org/apache/impala/planner/IcebergScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Expression.Operation;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.types.Types;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.BoolLiteral;
import org.apache.impala.analysis.CompoundPredicate;
import org.apache.impala.analysis.DateLiteral;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.InPredicate;
Expand Down Expand Up @@ -76,8 +78,8 @@ public class IcebergScanNode extends HdfsScanNode {

private final FeIcebergTable icebergTable_;

// Exprs in icebergConjuncts_ converted to UnboundPredicate.
private final List<UnboundPredicate> icebergPredicates_ = new ArrayList<>();
// Exprs in icebergConjuncts_ converted to Expression.
private final List<Expression> icebergPredicates_ = new ArrayList<>();

private TimeTravelSpec timeTravelSpec_;

Expand Down Expand Up @@ -208,87 +210,162 @@ private Operation getIcebergOperator(BinaryPredicate.Operator op) {
}
}

/**
* Returns Iceberg operator by CompoundPredicate operator, or null if the operation
* is not supported by Iceberg.
*/
private Operation getIcebergOperator(CompoundPredicate.Operator op) {
switch (op) {
case AND: return Operation.AND;
case OR: return Operation.OR;
case NOT: return Operation.NOT;
default: return null;
}
}

/**
* Transform impala predicate to iceberg predicate
*/
private void tryConvertIcebergPredicate(Analyzer analyzer, Expr expr)
throws ImpalaException {
Expression predicate = convertIcebergPredicate(analyzer, expr);
if (predicate != null) {
icebergPredicates_.add(predicate);
LOG.debug("Push down the predicate: " + predicate + " to iceberg");
}
}

private Expression convertIcebergPredicate(Analyzer analyzer, Expr expr)
throws ImpalaException {
if (expr instanceof BinaryPredicate) {
convertIcebergPredicate(analyzer, (BinaryPredicate) expr);
return convertIcebergPredicate(analyzer, (BinaryPredicate) expr);
} else if (expr instanceof InPredicate) {
convertIcebergPredicate(analyzer, (InPredicate) expr);
return convertIcebergPredicate(analyzer, (InPredicate) expr);
} else if (expr instanceof IsNullPredicate) {
convertIcebergPredicate((IsNullPredicate) expr);
return convertIcebergPredicate((IsNullPredicate) expr);
} else if (expr instanceof CompoundPredicate) {
return convertIcebergPredicate(analyzer, (CompoundPredicate) expr);
} else {
return null;
}
}

private void convertIcebergPredicate(Analyzer analyzer, BinaryPredicate predicate)
throws ImpalaException {
private UnboundPredicate<Object> convertIcebergPredicate(Analyzer analyzer,
BinaryPredicate predicate) throws ImpalaException {
Operation op = getIcebergOperator(predicate.getOp());
if (op == null) return;
if (op == null) {
return null;
}

// Do not convert if there is an implicit cast
if (!(predicate.getChild(0) instanceof SlotRef)) return;
if (!(predicate.getChild(0) instanceof SlotRef)) {
return null;
}
SlotRef ref = (SlotRef) predicate.getChild(0);

if (!(predicate.getChild(1) instanceof LiteralExpr)) return;
if (!(predicate.getChild(1) instanceof LiteralExpr)) {
return null;
}
LiteralExpr literal = (LiteralExpr) predicate.getChild(1);

// If predicate contains map/struct, this column would be null
Column col = ref.getDesc().getColumn();
if (col == null) return;
if (col == null) {
return null;
}

Object value = getIcebergValue(analyzer, ref, literal);
if (value == null) return;
if (value == null) {
return null;
}

icebergPredicates_.add(Expressions.predicate(op, col.getName(), value));
return Expressions.predicate(op, col.getName(), value);
}

private void convertIcebergPredicate(Analyzer analyzer, InPredicate predicate)
throws ImpalaException {
private UnboundPredicate<Object> convertIcebergPredicate(Analyzer analyzer,
InPredicate predicate) throws ImpalaException {
// TODO: convert NOT_IN predicate
if (predicate.isNotIn()) return;
if (predicate.isNotIn()) {
return null;
}

// Do not convert if there is an implicit cast
if (!(predicate.getChild(0) instanceof SlotRef)) return;
if (!(predicate.getChild(0) instanceof SlotRef)) {
return null;
}
SlotRef ref = (SlotRef) predicate.getChild(0);

// If predicate contains map/struct, this column would be null
Column col = ref.getDesc().getColumn();
if (col == null) return;
if (col == null) {
return null;
}

// Expressions takes a list of values as Objects
List<Object> values = new ArrayList<>();
for (int i = 1; i < predicate.getChildren().size(); ++i) {
if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) return;
if (!Expr.IS_LITERAL.apply(predicate.getChild(i))) {
return null;
}
LiteralExpr literal = (LiteralExpr) predicate.getChild(i);

// Cannot push IN or NOT_IN predicate with null literal values
if (Expr.IS_NULL_LITERAL.apply(literal)) return;
if (Expr.IS_NULL_LITERAL.apply(literal)) {
return null;
}

Object value = getIcebergValue(analyzer, ref, literal);
if (value == null) return;
if (value == null) {
return null;
}

values.add(value);
}

icebergPredicates_.add(Expressions.in(col.getName(), values));
return Expressions.in(col.getName(), values);
}

private void convertIcebergPredicate(IsNullPredicate predicate) {
private UnboundPredicate<Object> convertIcebergPredicate(IsNullPredicate predicate) {
// Do not convert if there is an implicit cast
if (!(predicate.getChild(0) instanceof SlotRef)) return;
if (!(predicate.getChild(0) instanceof SlotRef)) {
return null;
}
SlotRef ref = (SlotRef) predicate.getChild(0);

// If predicate contains map/struct, this column would be null
Column col = ref.getDesc().getColumn();
if (col == null) return;
if (col == null) {
return null;
}

if (predicate.isNotNull()) {
icebergPredicates_.add(Expressions.notNull(col.getName()));
return Expressions.notNull(col.getName());
} else{
icebergPredicates_.add(Expressions.isNull(col.getName()));
return Expressions.isNull(col.getName());
}
}

private Expression convertIcebergPredicate(Analyzer analyzer,
CompoundPredicate predicate) throws ImpalaException {
Operation op = getIcebergOperator(predicate.getOp());
if (op == null) {
return null;
}

Expression left = convertIcebergPredicate(analyzer, predicate.getChild(0));
if (left == null) {
return null;
}
if (op.equals(Operation.NOT)) {
return Expressions.not(left);
}

Expression right = convertIcebergPredicate(analyzer, predicate.getChild(1));
if (right == null) {
return null;
}
return op.equals(Operation.AND) ? Expressions.and(left, right)
: Expressions.or(left, right);
}

private Object getIcebergValue(Analyzer analyzer, SlotRef ref, LiteralExpr literal)
Expand Down
50 changes: 24 additions & 26 deletions fe/src/main/java/org/apache/impala/util/IcebergUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.impala.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.flatbuffers.FlatBufferBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
Expand All @@ -32,36 +40,20 @@
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.flatbuffers.FlatBufferBuilder;

import org.apache.impala.common.Pair;
import org.apache.impala.fb.FbFileMetadata;
import org.apache.impala.fb.FbIcebergDataFileFormat;
import org.apache.impala.fb.FbIcebergMetadata;
import org.apache.impala.fb.FbIcebergPartitionTransformValue;
import org.apache.impala.fb.FbIcebergTransformType;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.expressions.UnboundPredicate;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
import org.apache.iceberg.types.Conversions;
Expand All @@ -77,12 +69,18 @@
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.catalog.iceberg.IcebergCatalogs;
import org.apache.impala.catalog.iceberg.IcebergHadoopCatalog;
import org.apache.impala.catalog.iceberg.IcebergHadoopTables;
import org.apache.impala.catalog.iceberg.IcebergHiveCatalog;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
import org.apache.impala.catalog.iceberg.IcebergCatalogs;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.fb.FbFileMetadata;
import org.apache.impala.fb.FbIcebergDataFileFormat;
import org.apache.impala.fb.FbIcebergMetadata;
import org.apache.impala.fb.FbIcebergPartitionTransformValue;
import org.apache.impala.fb.FbIcebergTransformType;
import org.apache.impala.thrift.TCompressionCodec;
import org.apache.impala.thrift.THdfsCompression;
import org.apache.impala.thrift.THdfsFileFormat;
Expand Down Expand Up @@ -523,17 +521,17 @@ public static HdfsFileFormat toHdfsFileFormat(String format) {
* DataFiles in the first element.
*/
public static Pair<List<DataFile>, Boolean> getIcebergDataFiles(FeIcebergTable table,
List<UnboundPredicate> predicates, TimeTravelSpec timeTravelSpec)
List<Expression> predicates, TimeTravelSpec timeTravelSpec)
throws TableLoadingException {
if (table.snapshotId() == -1) return new Pair<>(Collections.emptyList(), false);

TableScan scan = createScanAsOf(table, timeTravelSpec);
for (UnboundPredicate predicate : predicates) {
for (Expression predicate : predicates) {
scan = scan.filter(predicate);
}

List<DataFile> dataFileList = new ArrayList<>();
Boolean hasDeleteFile = false;
boolean hasDeleteFile = false;
try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
if (!task.deletes().isEmpty()) {
Expand Down
Loading

0 comments on commit b58966b

Please sign in to comment.