Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

push down min/max/count to iceberg #5872

Closed
wants to merge 9 commits into from

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Sep 27, 2022

This PR pushes down min/max/count to iceberg.

For SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table, without this PR, iceberg will do SELECT col FROM table, and Spark will calculate MIN(col), MAX(col), COUNT(col), COUNT(*). With this PR, iceberg will do SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table. MIN, MAX, COUNT will be calculated on iceberg side using the statistics info in the manifest file.

I have the following changes:

  1. Add a table property AGGREGATE_PUSHDOWN_ENABLED. The default is true.
  2. Make SparkScanBuilder implement SupportsPushDownAggregates, so MIN/MAX/COUNT can be pushed down to iceberg, and then iceberg will read the statistics information (upper_bound, lower_bound, record_count) from manifest file, calculate the MIN/MAX/COUNT, build a Spark InternalRow and pass the InternalRow to Spark.
  3. push down logic is decided in SparkScanBuilder.pushAggregation. If any of the aggregates can't be pushed down, e.g. upper_bound, lower_bound, record_count are not available, we will fall back.
  4. Add a SparkLocalScan. It is a special Scan which will happen on Spark driver locally instead of executors. If MIN/MAX/COUNT are pushed down, iceberg will create a SparkLocalScan, and then iceberg doesn't need to plan files, create FileScanTasks, and send the tasks to executors. Instead, iceberg can just do a local scan on the Spark driver.
  5. SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.DATA_FILES) is used to get the statistics info(upper_bound, lower_bound, record_count), and then max, min or count are calculated from the statistics info, and an InternalRow will be built and returned to Spark.

In the tests, I look the explain result of the physical plan to check if MIN/MAX/COUNT are pushed down

For example, SELECT max(data), min(data) FROM table
If MIN/MAX/COUNT are not pushed down

== Optimized Logical Plan ==
Aggregate [max(data#146) AS max(data)#150, min(data#146) AS min(data)#151, count(data#146) AS count(data)#152L]
+- RelationV2[data#146] spark_catalog.default.table

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max(data#146), min(data#146), count(data#146)], output=[max(data)#150, min(data)#151, count(data)#152L])
   +- SortAggregate(key=[], functions=[partial_max(data#146), partial_min(data#146), partial_count(data#146)], output=[max#165, min#166, count#167L])
      +- BatchScan[data#146] spark_catalog.default.table [filters=] RuntimeFilters: []

If MIN/MAX/COUNT are pushed down

== Optimized Logical Plan ==
Aggregate [max(MAX(data)#440) AS max(data)#429, min(MIN(data)#441) AS min(data)#430, sum(COUNT(data)#442L) AS count(data)#431L]
+- RelationV2[MAX(data)#440, MIN(data)#441, COUNT(data)#442L] spark_catalog.default.table

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortAggregate(key=[], functions=[max(MAX(data)#440), min(MIN(data)#441), sum(COUNT(data)#442L)], output=[max(data)#429, min(data)#430, count(data)#431L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#297]
      +- SortAggregate(key=[], functions=[partial_max(MAX(data)#440), partial_min(MIN(data)#441), partial_sum(COUNT(data)#442L)], output=[max#446, min#447, sum#448L])
         +- LocalTableScan [MAX(data)#440, MIN(data)#441, COUNT(data)#442L]

I check the physical plan to see if it contains MAX(data)/MIN(data)/COUNT(data).

@huaxingao
Copy link
Contributor Author

cc @aokolnychyi @szehon-ho @flyrain

@huaxingao
Copy link
Contributor Author

also cc @kbendick @rdblue

@@ -349,4 +349,7 @@ private TableProperties() {}

public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

public static final String AGGREGATE_PUSHDOWN_ENABLED = "aggregate.pushdown.enabled";
public static final String AGGREGATE_PUSHDOWN_ENABLED_DEFAULT = "false";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have a nice fallback for any aggregation that cannot rely on the statistic completely, we can set the default to be true, or remove the property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comment!

I actually have thought about this when I wrote the code. The aggregate push down logic is decided inside SparkScanBuilder. I was debating myself if I should build the aggregates row inside SparkScanBuilder or SparkLocalScan. It seems more natural to build the aggregates row in SparkLocalScan so I put it there, but if I move this to SparkScanBuilder, then when I build the aggregates row using statistic, if the statistic are not available, I can fall back. I will change to that approach.

@Zhangg7723
Copy link
Contributor

Did you consider about equality delete files in V2 table?

@huaxingao
Copy link
Contributor Author

Did you consider about equality delete files in V2 table?

For merge on read, we need to check pos/equality delete, and fall back if there are delete files. I currently disabled min/max/count push down for merge on read.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @huaxingao for the PR. Left some comments.

*/
package org.apache.iceberg.expressions;

public abstract class Aggregate<T, C extends Term> implements Expression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the first type parameter T? Looks like it is not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks

*/
package org.apache.iceberg.expressions;

public abstract class Aggregate<T, C extends Term> implements Expression {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Java doc the class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

private Filter[] pushedFilters = NO_FILTERS;
private Aggregation pushedAggregations;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable is not used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted. Thanks!

Comment on lines 181 to 191
String deleteMode = table.properties().getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT);
String updateMode = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT);
String mergeMode = table.properties().getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT);
// the statistics might be changed for merge on read and can't be used to calculate
// min/max/count, so disable aggregate push down
// Todo: enable aggregate push down if there are not deletes files for merge-on-read
if (deleteMode.equals("merge-on-read")
|| updateMode.equals("merge-on-read")
|| mergeMode.equals("merge-on-read")) {
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The best way to decide fallback or not would be checking if there are row-level deletes in current snapshot. Checking the table properties may not be good enough. For example, the spec v2 table with COW for delete/update/merge modes may still have row-level deletes, since it can be changed to MOR as a write option. Can we also add this situation as a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to

    Map<String, String> map = table.currentSnapshot().summary();
    if (Integer.parseInt(map.get("total-position-deletes")) > 0
        || Integer.parseInt(map.get("total-equality-deletes")) > 0) {
      return false;
    }

I am not sure if this is correct.
I added a test for this, but somehow the test failed at delete. I ignore the test for now. Will figure out why the delete failed.

Comment on lines 327 to 336
// if aggregates are pushed down, instead of constructing a SparkBatchQueryScan, creating file
// read tasks and sending over the tasks to Spark executors, a SparkLocalScan will be created
// and the scan is done locally on the Spark driver instead of the executors. The statistics
// info will be retrieved from manifest file and used to build a Spark internal row, which
// contains the pushed down aggregate values.
if (pushedAggregateRows != null) {
return new SparkLocalScan(
table, aggregateExpressions, pushedAggregateSchema, pushedAggregateRows);
}

Copy link
Contributor

@flyrain flyrain Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the middle of batch scan build. I don't think it is a good idea to put here. I'd recommend something like this.

public Scan build() {
   if (pushedAggregateRows != null) {
      return new SparkLocalScan(...);
   } else {
      return buildBatchScan(...);
   }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks!

}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
public static InternalRow[] constructInternalRowForPushedDownAggregate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: A Java doc to describe the general idea of the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks

Dataset dataset =
metadataRows.selectExpr(
"lower_bounds", "upper_bounds", "record_count", "null_value_counts");
Row[] staticticRows = (Row[]) dataset.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: staticticRows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering how it performs in case of a large table with a large number of data files.
The current algo complexity is about O(field# * row#). Maybe that's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let me know if there is a better way to do this.

expressions.add(expr);
pushed.add(aggregate);
} catch (ValidationException e) {
// binding to the table schema failed, so this expression cannot be pushed down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log a warn message here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks

@@ -349,4 +349,7 @@ private TableProperties() {}

public static final String UPSERT_ENABLED = "write.upsert.enabled";
public static final boolean UPSERT_ENABLED_DEFAULT = false;

public static final String AGGREGATE_PUSHDOWN_ENABLED = "aggregate.pushdown.enabled";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this be done at a table level rather than in the query engine config?


import org.apache.iceberg.StructLike;

public class BoundAggregate<T> extends Aggregate<BoundTerm<T>> implements Bound<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these types are correct. If the aggregate is over a BoundTerm that produces T, there is no guarantee that the aggregate also produces T. For example, Count may wrap BoundTerm<String> and produce an Integer.

BoundTerm<T> bound = term().bind(struct, caseSensitive);
return new BoundAggregate<>(op(), bound);
} else {
return new BoundAggregate<>(op(), null); // Count (*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be safer to check the operation and then add a precondition that the term is non-null?

if (op() == COUNT_STAR) {
  return new BoundAggregate<>(op(), null);
} else {
  Precondition.checkArgument(term() != null, ...);
  BoundTerm<T> bound = term().bind(struct, caseSensitive);
  return new BoundAggregate<>(op(), bound);
}


public class SparkAggregates {

private static final Pattern BACKTICKS_PATTERN = Pattern.compile("([`])(.|$)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this taken from existing code? If so, can we reuse or refactor that rather than introducing another one here?

return Expressions.countStar();
case MAX:
Max maxAgg = (Max) aggregate;
return Expressions.max(unquote(maxAgg.column().describe()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks suspicious. Why call describe only to unquote the result? Why not get the parts individually and join them using .?

* use these to calculate Max/Min/Count, and then use the values of Max/Min/Count to construct an
* InternalRow to return to Spark.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer not to add this supression for new code. It usually means the code should be refactored to simpler component parts.

@@ -55,6 +55,14 @@ public <T> R predicate(BoundPredicate<T> pred) {
public <T> R predicate(UnboundPredicate<T> pred) {
return null;
}

public <T> R aggregate(BoundAggregate<T> agg) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to return null for all visitors that may have an Aggregate passed in through Expression?

I think we may want to throw UnsupportedOperationException to ensure that aggregates are either explicitly supported or will be rejected.

public boolean pushAggregation(Aggregation aggregation) {
if (!(table instanceof BaseTable)) {
return false;
}
Copy link
Contributor

@rdblue rdblue Oct 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Please add whitespace throughout this PR. We add an empty line between a control flow block (if, else, while, for, etc.) and the following statement:

if (condition) {
  // control flow block
}

variable = result; // following statement

if (!(table instanceof BaseTable)) {
return false;
}
boolean aggregatePushdown =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a better name would be to end with "Enabled". That makes it clear what the boolean is testing.

// if there are row-level deletes in current snapshot, the statics
// maybe changed, so disable push down aggregate
if (Integer.parseInt(map.get("total-position-deletes")) > 0
|| Integer.parseInt(map.get("total-equality-deletes")) > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use getOrDefault to handle cases where the key isn't present?

// in manifest files cannot be used to calculate min/max/count. However, if the
// group by expression is not the same as the partition, the statistics information can still
// be used to calculate min/max/count.
// Todo: enable aggregate push down for partition col group by expression
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this can be done in a follow up PR. This is why I added ExpressionUtil.selectsPartitions method. You should be able to use that to determine whether the aggregate filter is aligned with partitioning.

Nit: should be "TODO"

} else {
// only push down aggregates iff all of them can be pushed down.
LOG.info(
"Failed to convert this aggregate function to iceberg Aggregate, can't push down aggregate to iceberg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These error messages can be more concise:

Failed to convert aggregate expression: %s

You could also say "Cannot push down aggregate expression"

Also, should these be INFO statements? I think it makes sense to have a message stating whether the group could be pushed down, but a message for each expression seems too verbose.

// binding to the table schema failed, so this expression cannot be pushed down
// disable aggregate push down
LOG.info(
"Failed to bind expression to table schema, can't push down aggregate to iceberg",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "Failed to bind" or "Cannot push down aggregate (failed to bind): %s"?

pushedAggregateSchema =
SparkPushedDownAggregateUtil.buildSchemaForPushedDownAggregate(
aggregateExpressions, caseSensitive, schema);
if (pushedAggregateSchema != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the aggregation can't be pushed down because of missing metadata? For example, if lower/upper bounds are missing for a column because the metrics mode doesn't track the metadata?

type = schema.columns().get(i).type();
}
}
return SparkSchemaUtil.convert(type);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems too complicated to me. Why not use schema.findType(name)? If you want to ensure that you don't consider nested columns, then you can also use schema.asStruct().field(name).type()

private SparkPushedDownAggregateUtil() {}

// Build schema for pushed down aggregates. This schema will be used as the scan schema.
public static StructType buildSchemaForPushedDownAggregate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can each BoundAggregate return the type it produces? It should either be the type of the underlying term or Long:

  public Type type() {
    if (op() == COUNT || op() == COUNT_STAR) {
      return LongType.get();
    } else {
      return term.type();
    }
  }

With that information, you should be able to produce a schema more easily.

// down because the statistic info for complex are not available.
return finalSchema;
}
if ((aggregates.get(index)).op().name().equals("COUNT")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use a switch when working with an operation? I don't understand why this would use string comparison.

|| dataType instanceof ArrayType
|| dataType instanceof MapType) {
// not building pushed down aggregate schema for complex types to disable aggregate push
// down because the statistic info for complex are not available.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stats for nested structs are kept, aren't they?

|| dataType instanceof MapType) {
// not building pushed down aggregate schema for complex types to disable aggregate push
// down because the statistic info for complex are not available.
return finalSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is fine if this doesn't support nested types, but it isn't correct for this function to return the unfinished schema. Instead it should throw a ValidationException that aborts the aggregate pushdown.

}
}
}
return finalSchema;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this should be done in Iceberg code rather than in Spark code. This can produce an Iceberg schema and the convert it. That would allow us to more easily reuse this for other engines.

// get the statistics info from DATA_FILES, calculate the aggregate values (min/max/count)
// and use these value to build an internalRow.
Dataset<Row> metadataRows =
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.DATA_FILES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a flag to enable/disable running a parallel operation from within job planning. Nested a DataFrame operation like this does not seem like a good idea in all cases. I can certainly see why we want to do this, but I think we should have an implementation that uses planFiles first and then add this as an optimization.


@Override
public <T> Expression aggregate(UnboundAggregate<T> agg) {
return agg.bind(struct, caseSensitive);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the Binder tests to include aggregate expressions?

List<Object> valuesInSparkInternalRow = Lists.newArrayList();

for (int i = 0; i < fields.length; i++) {
if (fields[i].name().contains("MIN")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field name should not be used for this. It is going to be very slow. Instead, I think this should use the aggregate expressions. Why not implement eval for the aggregate expressions and just delegate to that?

Dataset dataset =
metadataRows.selectExpr(
"lower_bounds", "upper_bounds", "record_count", "null_value_counts");
Row[] statisticRows = (Row[]) dataset.collect();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not parallelize the aggregation on each task to minimize the number of rows (and remove maps) returned to the driver?

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huaxingao, this is a great start.

I'd like to get the expression changes in before getting the Spark changes in, if that's okay with you. Those are nearly ready to go. I think the implementation in SparkPushedDownAggregateUtil is what needs to change the most. A lot of that can be done more cleanly (and more efficiently) by using the eval method on the expressions.

@huaxingao
Copy link
Contributor Author

@rdblue Thank you very much for your detailed review! I will address the comments. I will split this PR and get the expression changes ready first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants