Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Support filter pushdown in IcebergTableSource #1893

Merged
merged 11 commits into from
Jan 18, 2021

Conversation

zhangjun0x01
Copy link
Contributor

add filter push down for IcebergTableSource

@github-actions github-actions bot added the flink label Dec 9, 2020
@zhangjun0x01
Copy link
Contributor Author

@openinx could you help me review the pr when you have time ,thanks

@openinx
Copy link
Member

openinx commented Dec 9, 2020

Thanks @zhangjun0x01 for contributing, I will review this patch today or tomorrow.


public static Expression convert(org.apache.flink.table.expressions.Expression flinkExpression) {
if (!(flinkExpression instanceof CallExpression)) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with flink, I wonder if it should be a valid use case here and in other places that we return null; should we throw instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iceberg support the following Expressions:
https://iceberg.apache.org/api/#expressions
For some expressions supported by flink but not supported by iceberg, I did not convert them, because they cannot be used for iceberg table scan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is an unsupported expression, there is no need to do filter push down, I think we should not throw a exception

Copy link
Contributor

@yyanyy yyanyy Dec 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Do we want to return Optional<Expression> here then? In this case it signals that the returned value could be null, so when we add the converted expression to the list we can decide to not add nulls, so that we don't have to do null check when calling toString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your suggestion,I update it to Optional.
and I add a not push down test case which return a Optional.empty()

FieldReferenceExpression field = (FieldReferenceExpression) args.get(0);
List<ResolvedExpression> values = args.subList(1, args.size());

List<Object> expressions = values.stream().filter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think these are input values, not expressions

}

private static boolean literalOnRight(List<ResolvedExpression> args) {
return args.get(0) instanceof FieldReferenceExpression && args.get(1) instanceof ValueLiteralExpression ? true :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need to have ? true : false

private FlinkFilters() {
}

private static final Map<BuiltInFunctionDefinition, Operation> FILTERS = ImmutableMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input flinkExpression. getFunctionDefinition()?
Also, there's a recent change that requires rewriting NaN in equals/notEquals to isNaN/notNaN as Iceberg's equals no longer accepts NaN as literal, so we will have to rewrite here as well. Here is a similar change done in spark filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this is mapping flink operations to iceberg operations in the following switch statement? If that's the case we probably don't really need it, and could directly switch based on the input flinkExpression. getFunctionDefinition()?

flinkExpression.getFunctionDefinition() return a implement class of FunctionDefinition,which cannot be used directly in switch,so we add a mapping,similar to SparkFilters

@@ -112,6 +119,11 @@ public String explainSource() {
explain += String.format(", LimitPushDown : %d", limit);
}

if (filters != null) {
explain += String.format(", FilterPushDown,the filters :%s",
filters.stream().map(filter -> filter.toString()).collect(Collectors.joining(",")));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think returning null in the filters class will cause NPE here as well

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's more simple to rewrite this as:

 explain += String.format(", FilterPushDown,the filters :%s", Joiner.on(",").join(filters));

@zhangjun0x01
Copy link
Contributor Author

@yyanyy thanks for your review,I update all

Object value = valueLiteralExpression.getValueAs(clazz).get();

BuiltInFunctionDefinition functionDefinition = (BuiltInFunctionDefinition) call.getFunctionDefinition();
if (functionDefinition.equals(BuiltInFunctionDefinitions.EQUALS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want to rewrite NOT_EQUALS to notNaN as well as notEquals in Iceberg also doesn't accept NaN as literal; I think SparkFilters doesn't do that because there's no NotEqualTo filter in Spark.

@zhangjun0x01 zhangjun0x01 force-pushed the filterPushDown branch 2 times, most recently from 658b61d to 629c5af Compare December 18, 2020 04:06
@rdblue rdblue changed the title Flink : add filter push down for IcebergTableSource Flink: Support filter pushdown for IcebergTableSource Dec 18, 2020
@rdblue rdblue changed the title Flink: Support filter pushdown for IcebergTableSource Flink: Support filter pushdown in IcebergTableSource Dec 18, 2020
switch (op) {
case IS_NULL:
FieldReferenceExpression isNullFilter = (FieldReferenceExpression) call.getResolvedChildren().get(0);
return Optional.of(Expressions.isNull(isNullFilter.getName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does FieldReferenceExpression.getName() reference nested fields?

Copy link
Contributor Author

@zhangjun0x01 zhangjun0x01 Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it, for example, we have a table,the flink ddl is like this:

CREATE TABLE iceberg_nested_test (
             id VARCHAR,
             title VARCHAR,
             properties ROW(`foo` VARCHAR)
) WITH (
     'connector'='iceberg'
);

if the query sql is select * from iceberg_nested_test where properties is null ,it supports filter push down in flink, and the name is properties,if the sql is select * from iceberg_nested_test where properties.foo is null,it do not supports filter push down in flink,the code will do not enter the IcebergTableSource#applyPredicate method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds fine that Flink doesn't currently support predicate pushdown on nested fields. @openinx, any plans to change this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes , flink does not support nested field push down now. Will need to file issue to address it in apache flink repo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I create a flink issue to track this https://issues.apache.org/jira/browse/FLINK-20767

}

String name = fieldReferenceExpression.getName();
Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class is parameterized, so this should be Class<?>


String name = fieldReferenceExpression.getName();
Class clazz = valueLiteralExpression.getOutputDataType().getConversionClass();
Object value = valueLiteralExpression.getValueAs(clazz).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ValueLiteralExpression allows the value to be null, in which case get here will throw an exception. How is this avoided? Does the parser reject col = null expressions?

@openinx may be able to help here, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, a few lines down there is an assertion that the value isn't null. This looks like a bug to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested it ,if the sql is select * from mytable where data = null ,it do not supports filter push down in flink,and we do not can get any data.
if the sql is select * from mytable where data is null, it is normal ,and It will enter the IS_NULL branch of switch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to fix this case rather than making the assumption that Flink won't push the = null filter. Handling null will be good for maintainability.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agreed with @rdblue that handling null in this function rather than assuming flink won't push down the null.

expression -> {
if (expression instanceof ValueLiteralExpression) {
return !((ValueLiteralExpression) flinkExpression).isNull();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not discard anything that is not a ValueLiteralExpression. Instead, if there is a non-literal this should either throw IllegalArgumentException or return Optional.empty to signal that the expression cannot be converted.

case IN:
List<ResolvedExpression> args = call.getResolvedChildren();
FieldReferenceExpression field = (FieldReferenceExpression) args.get(0);
List<ResolvedExpression> values = args.subList(1, args.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be converted to List<ValueLiteralExpression> to simplify value conversion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the IN,BETWEEN,NOT_BETWEEN will be auto convert in flink ,so we will not enter the IN block,
should we delete IN branch ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're sure that flink won't enter the IN block, then I think we should remove this block. Pls add a comment saying IN will convert to multiple OR.

Copy link
Contributor Author

@zhangjun0x01 zhangjun0x01 Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove IN block and add some comments

return !((ValueLiteralExpression) flinkExpression).isNull();
}

return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null values can't be ignored. This should either return Optional.empty or throw IllegalArgumentException if there is a null value.

return Optional.of(Expressions.in(field.getName(), inputValues));

case NOT:
Optional<Expression> child = convert(call.getResolvedChildren().get(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several calls to getResolvedChildren().get(0). I think that should be converted to a method that validates there is only one child and also validates the type:

  private <T extends ResolvedExpression> Optional<T> getOnlyChild(CallExpression call, Class<T> expectedChildClass) {
    List<ResolvedExpression> children = call.getResolvedChildren();
    if (children.size() != 1) {
      return Optional.empty();
    }

    ResolvedExpression child = children.get(0);
    if (!expectedChildClass.isInstance(child)) {
      return Optional.empty();
    }

    return Optional.of(expectedChildClass.cast(child));
  }

case NOT:
Optional<Expression> child = convert(call.getResolvedChildren().get(0));
if (child.isPresent()) {
return Optional.of(Expressions.not(child.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be child.map(Expressions::not).

}
}

return Optional.of(function.apply(name, value));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The literal value needs to be converted to Iceberg's internal representation before being passed to create an expression. Flink will return LocalDate, LocalTime, LocalDateTime, etc. just in the getValueAs method. And it isn't clear whether the value stored in the literal is the correct representation for other types as well.

@openinx, could you help recommend how to do the conversion here?

@@ -102,4 +105,154 @@ public void testLimitPushDown() {
Assert.assertEquals("should have 1 record", 1, mixedResult.size());
Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"});
}

@Test
public void testFilterPushDown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests should be broken into individual methods that are each a test case. To share code, use @Before and @After and different test suites.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test case that listens for a ScanEvent and validates that the expression was correctly passed to Iceberg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a listener to validate the pushdown for filter in TestFlinkTableSource

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.NaNUtil;

public class FlinkFilters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class needs an extensive test suite that checks the conversion from expected Flink expressions, not just a test for the source.

The conversion needs to cover at least these cases:

  • Equals with null
  • Not equals with null
  • In with null
  • Not in with null
  • Equals with NaN
  • Not equals with NaN
  • In with NaN
  • Not in with NaN
  • All inequalities with null
  • All inequalities with NaN
  • All expressions with a non-null and non-Nan value (preferably one string and one numeric)
  • Each data type that is supposed by Iceberg/Flink

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I look up the flink doc and the source code, and tested it ,it seems that NaN and Infinity are not supported by flink now .
the data type is supported by flink : here

@rdblue
Copy link
Contributor

rdblue commented Dec 18, 2020

Thanks for working on this @zhangjun0x01! It looks like a great start to me, and I'd really like to get this working in Flink.

@zhangjun0x01
Copy link
Contributor Author

Thanks for working on this @zhangjun0x01! It looks like a great start to me, and I'd really like to get this working in Flink.

@rdblue , thank you very much for your review,I am very sorry than some situations are not well considered,I would be careful next time, and I will update the PR later

@rdblue
Copy link
Contributor

rdblue commented Dec 21, 2020

I am very sorry than some situations are not well considered

No problem, this is why we review!

@openinx
Copy link
Member

openinx commented Dec 22, 2020

I'm sorry that I did not review this PR in time before (was focusing on flink cdc DataStream/SQL test cases and more optimizations things after the next release 0.11.0), will take a look tomorrow.

@rdblue
Copy link
Contributor

rdblue commented Jan 16, 2021

I'd like to get this into the 0.11.0 release, if possible. Thanks for working on this, @zhangjun0x01! It will be great to have this feature done.

@zhangjun0x01 zhangjun0x01 force-pushed the filterPushDown branch 2 times, most recently from edad5ee to 7daffa6 Compare January 16, 2021 11:39
@zhangjun0x01
Copy link
Contributor Author

I'd like to get this into the 0.11.0 release, if possible. Thanks for working on this, @zhangjun0x01! It will be great to have this feature done.

@rdblue thanks very much for your review,I updated it.

@zhangjun0x01 zhangjun0x01 force-pushed the filterPushDown branch 2 times, most recently from d33aa49 to 217d059 Compare January 16, 2021 14:33
org.apache.iceberg.expressions.Expressions.equal("field1", 1));

Assert.assertEquals("Predicate operation should match", expected.op(), not.op());
assertPredicatesMatch((UnboundPredicate<?>) expected.child(), (UnboundPredicate<?>) not.child());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These casts are unnecessary.

@rdblue
Copy link
Contributor

rdblue commented Jan 16, 2021

@zhangjun0x01, there are still a few things to fix in the tests, mostly minor. But I also found a major problem, which is that TestFlinkTableSource now takes a very long time to run. The problem is that the tests run for each format and for 3 different catalog configurations. That means each test runs 9 times and because it is a test that actually runs SQL it takes a long time. The whole suite takes much longer than needed; on my machine, it takes about 4 minutes.

The filter pushdown tests only need to run for one catalog and one file format because the purpose of those tests is to validate the assumptions of the FlinkFilter class with real filters from Flink SQL. The file format and catalog are orthogonal and we don't need to test each one of them. Can you change the parameterization to run with only Avro and a single catalog case?

@zhangjun0x01
Copy link
Contributor Author

@zhangjun0x01, there are still a few things to fix in the tests, mostly minor. But I also found a major problem, which is that TestFlinkTableSource now takes a very long time to run. The problem is that the tests run for each format and for 3 different catalog configurations. That means each test runs 9 times and because it is a test that actually runs SQL it takes a long time. The whole suite takes much longer than needed; on my machine, it takes about 4 minutes.

The filter pushdown tests only need to run for one catalog and one file format because the purpose of those tests is to validate the assumptions of the FlinkFilter class with real filters from Flink SQL. The file format and catalog are orthogonal and we don't need to test each one of them. Can you change the parameterization to run with only Avro and a single catalog case?

I refactor it with HadoopCatalog and Avro type.

@rdblue rdblue merged commit 70d309e into apache:master Jan 18, 2021
@rdblue
Copy link
Contributor

rdblue commented Jan 18, 2021

Thanks, @zhangjun0x01! It is great that this will be in the 0.11.0 release. Thanks for getting it done!

XuQianJin-Stars pushed a commit to XuQianJin-Stars/iceberg that referenced this pull request Mar 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants