-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
push down min/max/count to iceberg #5872
Conversation
@@ -349,4 +349,7 @@ private TableProperties() {} | |||
|
|||
public static final String UPSERT_ENABLED = "write.upsert.enabled"; | |||
public static final boolean UPSERT_ENABLED_DEFAULT = false; | |||
|
|||
public static final String AGGREGATE_PUSHDOWN_ENABLED = "aggregate.pushdown.enabled"; | |||
public static final String AGGREGATE_PUSHDOWN_ENABLED_DEFAULT = "false"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have a nice fallback for any aggregation that cannot rely on the statistic completely, we can set the default to be true
, or remove the property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment!
I actually have thought about this when I wrote the code. The aggregate push down logic is decided inside SparkScanBuilder
. I was debating myself if I should build the aggregates row inside SparkScanBuilder
or SparkLocalScan
. It seems more natural to build the aggregates row in SparkLocalScan
so I put it there, but if I move this to SparkScanBuilder
, then when I build the aggregates row using statistic, if the statistic are not available, I can fall back. I will change to that approach.
Did you consider about equality delete files in V2 table? |
For merge on read, we need to check pos/equality delete, and fall back if there are delete files. I currently disabled min/max/count push down for merge on read. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huaxingao for the PR. Left some comments.
*/ | ||
package org.apache.iceberg.expressions; | ||
|
||
public abstract class Aggregate<T, C extends Term> implements Expression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the first type parameter T
? Looks like it is not necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Thanks
*/ | ||
package org.apache.iceberg.expressions; | ||
|
||
public abstract class Aggregate<T, C extends Term> implements Expression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a Java doc the class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
private Filter[] pushedFilters = NO_FILTERS; | ||
private Aggregation pushedAggregations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deleted. Thanks!
String deleteMode = table.properties().getOrDefault(DELETE_MODE, DELETE_MODE_DEFAULT); | ||
String updateMode = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT); | ||
String mergeMode = table.properties().getOrDefault(MERGE_MODE, MERGE_MODE_DEFAULT); | ||
// the statistics might be changed for merge on read and can't be used to calculate | ||
// min/max/count, so disable aggregate push down | ||
// Todo: enable aggregate push down if there are not deletes files for merge-on-read | ||
if (deleteMode.equals("merge-on-read") | ||
|| updateMode.equals("merge-on-read") | ||
|| mergeMode.equals("merge-on-read")) { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best way to decide fallback or not would be checking if there are row-level deletes in current snapshot. Checking the table properties may not be good enough. For example, the spec v2 table with COW
for delete/update/merge modes may still have row-level deletes, since it can be changed to MOR
as a write option. Can we also add this situation as a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this to
Map<String, String> map = table.currentSnapshot().summary();
if (Integer.parseInt(map.get("total-position-deletes")) > 0
|| Integer.parseInt(map.get("total-equality-deletes")) > 0) {
return false;
}
I am not sure if this is correct.
I added a test for this, but somehow the test failed at delete. I ignore the test for now. Will figure out why the delete failed.
// if aggregates are pushed down, instead of constructing a SparkBatchQueryScan, creating file | ||
// read tasks and sending over the tasks to Spark executors, a SparkLocalScan will be created | ||
// and the scan is done locally on the Spark driver instead of the executors. The statistics | ||
// info will be retrieved from manifest file and used to build a Spark internal row, which | ||
// contains the pushed down aggregate values. | ||
if (pushedAggregateRows != null) { | ||
return new SparkLocalScan( | ||
table, aggregateExpressions, pushedAggregateSchema, pushedAggregateRows); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the middle of batch scan build. I don't think it is a good idea to put here. I'd recommend something like this.
public Scan build() {
if (pushedAggregateRows != null) {
return new SparkLocalScan(...);
} else {
return buildBatchScan(...);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
} | ||
|
||
@SuppressWarnings("checkstyle:CyclomaticComplexity") | ||
public static InternalRow[] constructInternalRowForPushedDownAggregate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: A Java doc to describe the general idea of the method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. Thanks
Dataset dataset = | ||
metadataRows.selectExpr( | ||
"lower_bounds", "upper_bounds", "record_count", "null_value_counts"); | ||
Row[] staticticRows = (Row[]) dataset.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: staticticRows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering how it performs in case of a large table with a large number of data files.
The current algo complexity is about O(field# * row#). Maybe that's fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let me know if there is a better way to do this.
expressions.add(expr); | ||
pushed.add(aggregate); | ||
} catch (ValidationException e) { | ||
// binding to the table schema failed, so this expression cannot be pushed down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log a warn message here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
@@ -349,4 +349,7 @@ private TableProperties() {} | |||
|
|||
public static final String UPSERT_ENABLED = "write.upsert.enabled"; | |||
public static final boolean UPSERT_ENABLED_DEFAULT = false; | |||
|
|||
public static final String AGGREGATE_PUSHDOWN_ENABLED = "aggregate.pushdown.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this be done at a table level rather than in the query engine config?
|
||
import org.apache.iceberg.StructLike; | ||
|
||
public class BoundAggregate<T> extends Aggregate<BoundTerm<T>> implements Bound<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these types are correct. If the aggregate is over a BoundTerm
that produces T
, there is no guarantee that the aggregate also produces T
. For example, Count
may wrap BoundTerm<String>
and produce an Integer
.
BoundTerm<T> bound = term().bind(struct, caseSensitive); | ||
return new BoundAggregate<>(op(), bound); | ||
} else { | ||
return new BoundAggregate<>(op(), null); // Count (*) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be safer to check the operation and then add a precondition that the term is non-null?
if (op() == COUNT_STAR) {
return new BoundAggregate<>(op(), null);
} else {
Precondition.checkArgument(term() != null, ...);
BoundTerm<T> bound = term().bind(struct, caseSensitive);
return new BoundAggregate<>(op(), bound);
}
|
||
public class SparkAggregates { | ||
|
||
private static final Pattern BACKTICKS_PATTERN = Pattern.compile("([`])(.|$)"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this taken from existing code? If so, can we reuse or refactor that rather than introducing another one here?
return Expressions.countStar(); | ||
case MAX: | ||
Max maxAgg = (Max) aggregate; | ||
return Expressions.max(unquote(maxAgg.column().describe())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks suspicious. Why call describe
only to unquote
the result? Why not get the parts individually and join them using .
?
* use these to calculate Max/Min/Count, and then use the values of Max/Min/Count to construct an | ||
* InternalRow to return to Spark. | ||
*/ | ||
@SuppressWarnings("checkstyle:CyclomaticComplexity") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer not to add this supression for new code. It usually means the code should be refactored to simpler component parts.
@@ -55,6 +55,14 @@ public <T> R predicate(BoundPredicate<T> pred) { | |||
public <T> R predicate(UnboundPredicate<T> pred) { | |||
return null; | |||
} | |||
|
|||
public <T> R aggregate(BoundAggregate<T> agg) { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to return null
for all visitors that may have an Aggregate
passed in through Expression
?
I think we may want to throw UnsupportedOperationException
to ensure that aggregates are either explicitly supported or will be rejected.
public boolean pushAggregation(Aggregation aggregation) { | ||
if (!(table instanceof BaseTable)) { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: Please add whitespace throughout this PR. We add an empty line between a control flow block (if
, else
, while
, for
, etc.) and the following statement:
if (condition) {
// control flow block
}
variable = result; // following statement
if (!(table instanceof BaseTable)) { | ||
return false; | ||
} | ||
boolean aggregatePushdown = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a better name would be to end with "Enabled". That makes it clear what the boolean is testing.
// if there are row-level deletes in current snapshot, the statics | ||
// maybe changed, so disable push down aggregate | ||
if (Integer.parseInt(map.get("total-position-deletes")) > 0 | ||
|| Integer.parseInt(map.get("total-equality-deletes")) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use getOrDefault
to handle cases where the key isn't present?
// in manifest files cannot be used to calculate min/max/count. However, if the | ||
// group by expression is not the same as the partition, the statistics information can still | ||
// be used to calculate min/max/count. | ||
// Todo: enable aggregate push down for partition col group by expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this can be done in a follow up PR. This is why I added ExpressionUtil.selectsPartitions
method. You should be able to use that to determine whether the aggregate filter is aligned with partitioning.
Nit: should be "TODO"
} else { | ||
// only push down aggregates iff all of them can be pushed down. | ||
LOG.info( | ||
"Failed to convert this aggregate function to iceberg Aggregate, can't push down aggregate to iceberg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These error messages can be more concise:
Failed to convert aggregate expression: %s
You could also say "Cannot push down aggregate expression"
Also, should these be INFO statements? I think it makes sense to have a message stating whether the group could be pushed down, but a message for each expression seems too verbose.
// binding to the table schema failed, so this expression cannot be pushed down | ||
// disable aggregate push down | ||
LOG.info( | ||
"Failed to bind expression to table schema, can't push down aggregate to iceberg", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about "Failed to bind" or "Cannot push down aggregate (failed to bind): %s"?
pushedAggregateSchema = | ||
SparkPushedDownAggregateUtil.buildSchemaForPushedDownAggregate( | ||
aggregateExpressions, caseSensitive, schema); | ||
if (pushedAggregateSchema != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the aggregation can't be pushed down because of missing metadata? For example, if lower/upper bounds are missing for a column because the metrics mode doesn't track the metadata?
type = schema.columns().get(i).type(); | ||
} | ||
} | ||
return SparkSchemaUtil.convert(type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems too complicated to me. Why not use schema.findType(name)
? If you want to ensure that you don't consider nested columns, then you can also use schema.asStruct().field(name).type()
private SparkPushedDownAggregateUtil() {} | ||
|
||
// Build schema for pushed down aggregates. This schema will be used as the scan schema. | ||
public static StructType buildSchemaForPushedDownAggregate( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can each BoundAggregate
return the type it produces? It should either be the type of the underlying term or Long
:
public Type type() {
if (op() == COUNT || op() == COUNT_STAR) {
return LongType.get();
} else {
return term.type();
}
}
With that information, you should be able to produce a schema more easily.
// down because the statistic info for complex are not available. | ||
return finalSchema; | ||
} | ||
if ((aggregates.get(index)).op().name().equals("COUNT")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use a switch
when working with an operation? I don't understand why this would use string comparison.
|| dataType instanceof ArrayType | ||
|| dataType instanceof MapType) { | ||
// not building pushed down aggregate schema for complex types to disable aggregate push | ||
// down because the statistic info for complex are not available. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stats for nested structs are kept, aren't they?
|| dataType instanceof MapType) { | ||
// not building pushed down aggregate schema for complex types to disable aggregate push | ||
// down because the statistic info for complex are not available. | ||
return finalSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is fine if this doesn't support nested types, but it isn't correct for this function to return the unfinished schema. Instead it should throw a ValidationException
that aborts the aggregate pushdown.
} | ||
} | ||
} | ||
return finalSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should be done in Iceberg code rather than in Spark code. This can produce an Iceberg schema and the convert it. That would allow us to more easily reuse this for other engines.
// get the statistics info from DATA_FILES, calculate the aggregate values (min/max/count) | ||
// and use these value to build an internalRow. | ||
Dataset<Row> metadataRows = | ||
SparkTableUtil.loadMetadataTable(spark, table, MetadataTableType.DATA_FILES); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be a flag to enable/disable running a parallel operation from within job planning. Nested a DataFrame operation like this does not seem like a good idea in all cases. I can certainly see why we want to do this, but I think we should have an implementation that uses planFiles
first and then add this as an optimization.
|
||
@Override | ||
public <T> Expression aggregate(UnboundAggregate<T> agg) { | ||
return agg.bind(struct, caseSensitive); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update the Binder
tests to include aggregate expressions?
List<Object> valuesInSparkInternalRow = Lists.newArrayList(); | ||
|
||
for (int i = 0; i < fields.length; i++) { | ||
if (fields[i].name().contains("MIN")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field name should not be used for this. It is going to be very slow. Instead, I think this should use the aggregate expressions. Why not implement eval
for the aggregate expressions and just delegate to that?
Dataset dataset = | ||
metadataRows.selectExpr( | ||
"lower_bounds", "upper_bounds", "record_count", "null_value_counts"); | ||
Row[] statisticRows = (Row[]) dataset.collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not parallelize the aggregation on each task to minimize the number of rows (and remove maps) returned to the driver?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao, this is a great start.
I'd like to get the expression changes in before getting the Spark changes in, if that's okay with you. Those are nearly ready to go. I think the implementation in SparkPushedDownAggregateUtil
is what needs to change the most. A lot of that can be done more cleanly (and more efficiently) by using the eval
method on the expressions.
@rdblue Thank you very much for your detailed review! I will address the comments. I will split this PR and get the expression changes ready first. |
This PR pushes down min/max/count to iceberg.
For
SELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table
, without this PR, iceberg will doSELECT col FROM table
, and Spark will calculate MIN(col), MAX(col), COUNT(col), COUNT(*). With this PR, iceberg will doSELECT MIN(col), MAX(col), COUNT(col), COUNT(*) FROM table
. MIN, MAX, COUNT will be calculated on iceberg side using the statistics info in the manifest file.I have the following changes:
AGGREGATE_PUSHDOWN_ENABLED
. The default is true.SparkScanBuilder
implementSupportsPushDownAggregates
, so MIN/MAX/COUNT can be pushed down to iceberg, and then iceberg will read the statistics information (upper_bound, lower_bound, record_count) from manifest file, calculate the MIN/MAX/COUNT, build a Spark InternalRow and pass the InternalRow to Spark.SparkScanBuilder
.pushAggregation
. If any of the aggregates can't be pushed down, e.g. upper_bound, lower_bound, record_count are not available, we will fall back.SparkLocalScan
. It is a special Scan which will happen on Spark driver locally instead of executors. If MIN/MAX/COUNT are pushed down, iceberg will create aSparkLocalScan
, and then iceberg doesn't need to plan files, create FileScanTasks, and send the tasks to executors. Instead, iceberg can just do a local scan on the Spark driver.In the tests, I look the explain result of the physical plan to check if MIN/MAX/COUNT are pushed down
For example,
SELECT max(data), min(data) FROM table
If MIN/MAX/COUNT are not pushed down
If MIN/MAX/COUNT are pushed down
I check the physical plan to see if it contains MAX(data)/MIN(data)/COUNT(data).