Skip to content

[FLINK-36808][table-planner] Fix LookupJoin bug when used with filterable table source #26514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

morazow
Copy link
Contributor

@morazow morazow commented Apr 28, 2025

What is the purpose of the change

  • Fix bug in LookupJoin that occurs when used together with filterable (that support filter pushdown) table and union all

Brief description of the bug

Given a union all query:

  • that combines results of the two lookup joins
  • that these lookup joins have different filter queries
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'OK' 
UNION ALL
SELECT
     s.id,
     s.name,
     s.txn_time,
     d.status
FROM `stream` AS `s` INNER JOIN `dim` FOR SYSTEM_TIME AS OF `s`.`txn_time` AS `d`
ON
     `s`.`id` = `d`.`id`
WHERE
     `d`.`status` = 'NOT_EXISTS';

In this situation the planner will pushdown the filter condition into the table part of the lookup join, but the structure of the lookup joins stays the same, e.g, they will have same digests with different table / temporal table.

This is the problem since the Calcite Volcano optimizer will register them equivalent because it does so using the digest of the relation nodes.

This introduces the bug because when optimizing the Union, both parts of the query will be treated same (even though we have different where clauses) and the found cheapest plan will be same for both lookup joins.

You can also see the effect if you put non-existing filter first the result will be empty, because the first lookupjoin is also used for the second part of the union.

Alternative Solutions

The better solution would be to improve the LookupJoin expression to also include the filter condition into the table name. For example,

LookupJoin(table=[default_catalog.default_database.dim, filter=[<PUSHED-DOWN-FILTERS>]], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]])

But this would require many refactoring, and mainly in the tests.

In this PR, I have opted for adding another filter with pushed down filter conditions if the LookupJoin contains a table with filter pushdowns.

Brief change log

  • Add unit test to reproduce the test
  • Add (one alternative) fix to resolve the bug

Verifying this change

The change adds test case that reproduces the bug that can be verified by the fixes.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Apr 28, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@davidradl
Copy link
Contributor

nit: the Jira number has an extra 0 at the end in the PR title

import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
import org.apache.flink.table.planner.plan.schema.{IntermediateRelTable, LegacyTableSourceTable, TableSourceTable}
import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, ExpressionFormat, InputRefVisitor, JoinTypeUtil, LookupJoinUtil, RelExplainUtil, TemporalJoinUtil}
import org.apache.flink.table.planner.plan.utils._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: normal practise is to list all of the import packages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @davidradl, addressed 👍

@davidradl
Copy link
Contributor

@morazow If I am understanding this correctly, the filtered pushed down joins cause the Volcano planner to uniquely identify
each look up join.

I wonder what happens if one of the lookup joins supports filter push down but the other doesn't? Does the include the name of the source as well as the filter? It would be good to have tests for when both sources support filter pushdown, neither do and only 1 does.

@morazow morazow changed the title [FLINK-368080][table-planner] Fix LookupJoin bug when used with filterable table source [FLINK-36808][table-planner] Fix LookupJoin bug when used with filterable table source Apr 28, 2025
@morazow
Copy link
Contributor Author

morazow commented Apr 28, 2025

Thanks @davidradl, addressed your findings, please have another look 🤝

the filtered pushed down joins cause the Volcano planner to uniquely identify each look up join.

Yes, but based on the getDigests method, which results in a string like below:

LookupJoin(table=[default_catalog.default_database.dim], joinType=[InnerJoin], lookup=[id=id], select=[id, name, txn_time, id], upsertKey=[[0]])

Here we have the table name, but no information on the pushed down filter conditions. For this issue to happen, the lookup table's should be same (since it is in the digest), so we cannot test if one part supports and other part doesn't test case.

Additionally, if the source doesn't support the filter pushdown it will be represented in the digest with the where keyword. This will not also reproduce the bug because the digests are different.

@morazow morazow requested a review from davidradl April 28, 2025 15:38
Copy link
Contributor

@xuyangzhong xuyangzhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for digging into this bug and driving this fix. I just left some comments.

@@ -209,6 +215,20 @@ abstract class CommonPhysicalLookupJoin(
.itemIf("retry", retryOptions.getOrElse(""), retryOptions.isDefined)
}

private def getTableFilterString(t: TableSourceTable): String = {
val filterOpt = t.abilitySpecs.collectFirst { case spec: FilterPushDownSpec => spec }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering should PartitionPushDownSpec also need to be added into this part.


super
.explainTerms(pw)
.item("table", tableIdentifier.asSummaryString())
.item("joinType", JoinTypeUtil.getFlinkJoinType(joinType))
.item("lookup", lookupKeys)
.itemIf("where", whereString, whereString.nonEmpty)
.itemIf("filter", filterPushdownString, filterPushdownString.nonEmpty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit what about filterPushedDown?

import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

class UnionLookupJoinITCase extends StreamingTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please move this test to LookupJoinITCase?

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed component=TableSQL/Planner community-reviewed PR has been reviewed by the community. labels Jun 30, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Jul 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants