-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-36808][table-planner] Fix LookupJoin bug when used with filterable table source #26514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
nit: the Jira number has an extra |
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode | ||
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel | ||
import org.apache.flink.table.planner.plan.schema.{IntermediateRelTable, LegacyTableSourceTable, TableSourceTable} | ||
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, ExpressionFormat, InputRefVisitor, JoinTypeUtil, LookupJoinUtil, RelExplainUtil, TemporalJoinUtil} | ||
import org.apache.flink.table.planner.plan.utils._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: normal practise is to list all of the import packages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @davidradl, addressed 👍
@morazow If I am understanding this correctly, the filtered pushed down joins cause the Volcano planner to uniquely identify I wonder what happens if one of the lookup joins supports filter push down but the other doesn't? Does the include the name of the source as well as the filter? It would be good to have tests for when both sources support filter pushdown, neither do and only 1 does. |
2a6053f
to
867644d
Compare
Thanks @davidradl, addressed your findings, please have another look 🤝
Yes, but based on the getDigests method, which results in a string like below:
Here we have the table name, but no information on the pushed down filter conditions. For this issue to happen, the lookup table's should be same (since it is in the digest), so we cannot test if one part supports and other part doesn't test case. Additionally, if the source doesn't support the filter pushdown it will be represented in the digest with the |
…able table source
867644d
to
eca1bf6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for digging into this bug and driving this fix. I just left some comments.
@@ -209,6 +215,20 @@ abstract class CommonPhysicalLookupJoin( | |||
.itemIf("retry", retryOptions.getOrElse(""), retryOptions.isDefined) | |||
} | |||
|
|||
private def getTableFilterString(t: TableSourceTable): String = { | |||
val filterOpt = t.abilitySpecs.collectFirst { case spec: FilterPushDownSpec => spec } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering should PartitionPushDownSpec
also need to be added into this part.
|
||
super | ||
.explainTerms(pw) | ||
.item("table", tableIdentifier.asSummaryString()) | ||
.item("joinType", JoinTypeUtil.getFlinkJoinType(joinType)) | ||
.item("lookup", lookupKeys) | ||
.itemIf("where", whereString, whereString.nonEmpty) | ||
.itemIf("filter", filterPushdownString, filterPushdownString.nonEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit what about filterPushedDown
?
import org.junit.jupiter.params.ParameterizedTest | ||
import org.junit.jupiter.params.provider.CsvSource | ||
|
||
class UnionLookupJoinITCase extends StreamingTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please move this test to LookupJoinITCase
?
What is the purpose of the change
Brief description of the bug
Given a union all query:
In this situation the planner will pushdown the filter condition into the table part of the lookup join, but the structure of the lookup joins stays the same, e.g, they will have same
digests
with different table / temporal table.This is the problem since the Calcite Volcano optimizer will register them equivalent because it does so using the digest of the relation nodes.
This introduces the bug because when optimizing the Union, both parts of the query will be treated same (even though we have different where clauses) and the found cheapest plan will be same for both lookup joins.
You can also see the effect if you put non-existing filter first the result will be empty, because the first lookupjoin is also used for the second part of the union.
Alternative Solutions
The better solution would be to improve the LookupJoin expression to also include the filter condition into the table name. For example,
But this would require many refactoring, and mainly in the tests.
In this PR, I have opted for adding another
filter
with pushed down filter conditions if the LookupJoin contains a table with filter pushdowns.Brief change log
Verifying this change
The change adds test case that reproduces the bug that can be verified by the fixes.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation