-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Support filter pushdown in IcebergTableSource #1893
Conversation
@openinx could you help me review the pr when you have time ,thanks |
Thanks @zhangjun0x01 for contributing, I will review this patch today or tomorrow. |
2fe26ee
to
e05b6e9
Compare
|
||
public static Expression convert(org.apache.flink.table.expressions.Expression flinkExpression) { | ||
if (!(flinkExpression instanceof CallExpression)) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not familiar with flink, I wonder if it should be a valid use case here and in other places that we return null; should we throw instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg support the following Expressions:
https://iceberg.apache.org/api/#expressions
For some expressions supported by flink but not supported by iceberg, I did not convert them, because they cannot be used for iceberg table scan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is an unsupported expression, there is no need to do filter push down, I think we should not throw a exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Do we want to return Optional<Expression>
here then? In this case it signals that the returned value could be null, so when we add the converted expression to the list we can decide to not add nulls, so that we don't have to do null check when calling toString
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for your suggestion,I update it to Optional.
and I add a not push down test case which return a Optional.empty()
FieldReferenceExpression field = (FieldReferenceExpression) args.get(0); | ||
List<ResolvedExpression> values = args.subList(1, args.size()); | ||
|
||
List<Object> expressions = values.stream().filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think these are input values, not expressions
} | ||
|
||
private static boolean literalOnRight(List<ResolvedExpression> args) { | ||
return args.get(0) instanceof FieldReferenceExpression && args.get(1) instanceof ValueLiteralExpression ? true : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need to have ? true : false
private FlinkFilters() { | ||
} | ||
|
||
private static final Map<BuiltInFunctionDefinition, Operation> FILTERS = ImmutableMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input flinkExpression. getFunctionDefinition()
?
Also, there's a recent change that requires rewriting NaN in equals/notEquals to isNaN/notNaN as Iceberg's equals no longer accepts NaN as literal, so we will have to rewrite here as well. Here is a similar change done in spark filters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input
flinkExpression. getFunctionDefinition()
?
flinkExpression.getFunctionDefinition()
return a implement class of FunctionDefinition
,which cannot be used directly in switch,so we add a mapping,similar to SparkFilters
@@ -112,6 +119,11 @@ public String explainSource() { | |||
explain += String.format(", LimitPushDown : %d", limit); | |||
} | |||
|
|||
if (filters != null) { | |||
explain += String.format(", FilterPushDown,the filters :%s", | |||
filters.stream().map(filter -> filter.toString()).collect(Collectors.joining(","))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think returning null
in the filters class will cause NPE here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's more simple to rewrite this as:
explain += String.format(", FilterPushDown,the filters :%s", Joiner.on(",").join(filters));
e05b6e9
to
7f7cbf5
Compare
@yyanyy thanks for your review,I update all |
Object value = valueLiteralExpression.getValueAs(clazz).get(); | ||
|
||
BuiltInFunctionDefinition functionDefinition = (BuiltInFunctionDefinition) call.getFunctionDefinition(); | ||
if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may want to rewrite NOT_EQUALS
to notNaN
as well as notEquals
in Iceberg also doesn't accept NaN as literal; I think SparkFilters doesn't do that because there's no NotEqualTo
filter in Spark.
658b61d
to
629c5af
Compare
switch (op) { | ||
case IS_NULL: | ||
FieldReferenceExpression isNullFilter = (FieldReferenceExpression) call.getResolvedChildren().get(0); | ||
return Optional.of(Expressions.isNull(isNullFilter.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does FieldReferenceExpression.getName()
reference nested fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it, for example, we have a table,the flink ddl is like this:
CREATE TABLE iceberg_nested_test (
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector'='iceberg'
);
if the query sql is select * from iceberg_nested_test where properties is null
,it supports filter push down in flink, and the name is properties
,if the sql is select * from iceberg_nested_test where properties.foo is null
,it do not supports filter push down in flink,the code will do not enter the IcebergTableSource#applyPredicate
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds fine that Flink doesn't currently support predicate pushdown on nested fields. @openinx, any plans to change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , flink does not support nested field push down now. Will need to file issue to address it in apache flink repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create a flink issue to track this https://issues.apache.org/jira/browse/FLINK-20767
} | ||
|
||
String name = fieldReferenceExpression.getName(); | ||
Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class
is parameterized, so this should be Class<?>
|
||
String name = fieldReferenceExpression.getName(); | ||
Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass(); | ||
Object value = valueLiteralExpression.getValueAs(clazz).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ValueLiteralExpression
allows the value to be null, in which case get
here will throw an exception. How is this avoided? Does the parser reject col = null
expressions?
@openinx may be able to help here, too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, a few lines down there is an assertion that the value isn't null. This looks like a bug to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it ,if the sql is select * from mytable where data = null
,it do not supports filter push down in flink,and we do not can get any data.
if the sql is select * from mytable where data is null
, it is normal ,and It will enter the IS_NULL
branch of switch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be good to fix this case rather than making the assumption that Flink won't push the = null
filter. Handling null will be good for maintainability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agreed with @rdblue that handling null in this function rather than assuming flink won't push down the null
.
expression -> { | ||
if (expression instanceof ValueLiteralExpression) { | ||
return !((ValueLiteralExpression) flinkExpression).isNull(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not discard anything that is not a ValueLiteralExpression
. Instead, if there is a non-literal this should either throw IllegalArgumentException
or return Optional.empty
to signal that the expression cannot be converted.
case IN: | ||
List<ResolvedExpression> args = call.getResolvedChildren(); | ||
FieldReferenceExpression field = (FieldReferenceExpression) args.get(0); | ||
List<ResolvedExpression> values = args.subList(1, args.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be converted to List<ValueLiteralExpression>
to simplify value conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the IN
,BETWEEN
,NOT_BETWEEN
will be auto convert in flink ,so we will not enter the IN
block,
should we delete IN
branch ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're sure that flink won't enter the IN
block, then I think we should remove this block. Pls add a comment saying IN
will convert to multiple OR
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remove IN
block and add some comments
return !((ValueLiteralExpression) flinkExpression).isNull(); | ||
} | ||
|
||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Null values can't be ignored. This should either return Optional.empty
or throw IllegalArgumentException
if there is a null value.
return Optional.of(Expressions.in(field.getName(), inputValues)); | ||
|
||
case NOT: | ||
Optional<Expression> child = convert(call.getResolvedChildren().get(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are several calls to getResolvedChildren().get(0)
. I think that should be converted to a method that validates there is only one child and also validates the type:
private <T extends ResolvedExpression> Optional<T> getOnlyChild(CallExpression call, Class<T> expectedChildClass) {
List<ResolvedExpression> children = call.getResolvedChildren();
if (children.size() != 1) {
return Optional.empty();
}
ResolvedExpression child = children.get(0);
if (!expectedChildClass.isInstance(child)) {
return Optional.empty();
}
return Optional.of(expectedChildClass.cast(child));
}
case NOT: | ||
Optional<Expression> child = convert(call.getResolvedChildren().get(0)); | ||
if (child.isPresent()) { | ||
return Optional.of(Expressions.not(child.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be child.map(Expressions::not)
.
} | ||
} | ||
|
||
return Optional.of(function.apply(name, value)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The literal value needs to be converted to Iceberg's internal representation before being passed to create an expression. Flink will return LocalDate
, LocalTime
, LocalDateTime
, etc. just in the getValueAs
method. And it isn't clear whether the value stored in the literal is the correct representation for other types as well.
@openinx, could you help recommend how to do the conversion here?
@@ -102,4 +105,154 @@ public void testLimitPushDown() { | |||
Assert.assertEquals("should have 1 record", 1, mixedResult.size()); | |||
Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"}); | |||
} | |||
|
|||
@Test | |||
public void testFilterPushDown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should be broken into individual methods that are each a test case. To share code, use @Before
and @After
and different test suites.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test case that listens for a ScanEvent
and validates that the expression was correctly passed to Iceberg?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a listener to validate the pushdown for filter in TestFlinkTableSource
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.util.NaNUtil; | ||
|
||
public class FlinkFilters { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class needs an extensive test suite that checks the conversion from expected Flink expressions, not just a test for the source.
The conversion needs to cover at least these cases:
- Equals with null
- Not equals with null
- In with null
- Not in with null
- Equals with NaN
- Not equals with NaN
- In with NaN
- Not in with NaN
- All inequalities with null
- All inequalities with NaN
- All expressions with a non-null and non-Nan value (preferably one string and one numeric)
- Each data type that is supposed by Iceberg/Flink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I look up the flink doc and the source code, and tested it ,it seems that NaN
and Infinity
are not supported by flink now .
the data type is supported by flink : here
Thanks for working on this @zhangjun0x01! It looks like a great start to me, and I'd really like to get this working in Flink. |
@rdblue , thank you very much for your review,I am very sorry than some situations are not well considered,I would be careful next time, and I will update the PR later |
No problem, this is why we review! |
I'm sorry that I did not review this PR in time before (was focusing on flink cdc DataStream/SQL test cases and more optimizations things after the next release 0.11.0), will take a look tomorrow. |
I'd like to get this into the 0.11.0 release, if possible. Thanks for working on this, @zhangjun0x01! It will be great to have this feature done. |
edad5ee
to
7daffa6
Compare
@rdblue thanks very much for your review,I updated it. |
d33aa49
to
217d059
Compare
org.apache.iceberg.expressions.Expressions.equal("field1", 1)); | ||
|
||
Assert.assertEquals("Predicate operation should match", expected.op(), not.op()); | ||
assertPredicatesMatch((UnboundPredicate<?>) expected.child(), (UnboundPredicate<?>) not.child()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These casts are unnecessary.
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
Outdated
Show resolved
Hide resolved
@zhangjun0x01, there are still a few things to fix in the tests, mostly minor. But I also found a major problem, which is that The filter pushdown tests only need to run for one catalog and one file format because the purpose of those tests is to validate the assumptions of the |
flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSource.java
Outdated
Show resolved
Hide resolved
217d059
to
c93eaa7
Compare
c93eaa7
to
138fa46
Compare
I refactor it with |
Thanks, @zhangjun0x01! It is great that this will be in the 0.11.0 release. Thanks for getting it done! |
add filter push down for IcebergTableSource