-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-52187][SQL] Introduce Join pushdown for DSv2 #50921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-52187][SQL] Introduce Join pushdown for DSv2 #50921
Conversation
c90f33e
to
ea86140
Compare
ea86140
to
ecb5608
Compare
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
public String[] qualifier; | ||
public String name; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to separate qualifier and name? I think JoinColumn
should be the same as NamedReference
with an additional isInLeftSideOfJoin
flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can have similar implementation as FieldReference
where we would have parts
and isInLeftSideOfJoin
fields, but I find the separation between qualifier and name nicer because it makes code cleaner in some way.
If you take a look at JDBCScanBuilder.pushJoin
we are passing the condition that contains JoinColumns as leaf expressions, but these are not yet qualified. I am qualifying these later on, in qualifyCondition
method.
Without qualifier-name
separation, and with parts:Seq[String]
I would need to do array shifting, which is fine but I just find it nicer my way.
I can however change the implementation of JoinColumn if to be something like:
private[sql] final case class JoinColumn(
parts: Seq[String],
isInLeftSideOfJoin: Boolean)
extends NamedReference {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def fieldNames(): Array[String] = parts.toArray
}
Honestly, I am fine with both
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownJoin.java
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/java/org/apache/spark/sql/connector/util/JoinTypeSQLBuilder.java
Outdated
Show resolved
Hide resolved
import java.util.Map; | ||
|
||
/** | ||
* The builder to generate SQL for specific Join type. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is really needed. The join type string is quite simple, and Spark doesn't need to provide a helper to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be redundant. The reason why I have it is simply the answer to the following question: what if some dialect calls the specific join type differently. For example, what if there is a dialect that doesn't support CROSS JOIN
but only JOIN
syntax.
We can get same effect with just string comparison in the dialects, so we can get rid of it if you find it as an overkill.
@@ -174,6 +178,12 @@ protected String visitNamedReference(NamedReference namedRef) { | |||
return namedRef.toString(); | |||
} | |||
|
|||
protected String visitJoinColumn(JoinColumn column) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we fail by default? the implementations must provide the left/right side alias as a context, in order to generate the column name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the implementations must provide the left/right side alias as a context, in order to generate the column name.
not really.. The way I designed this is that you are already going to have left/right side alias in JoinColumn before visiting it. So I think this implementation is valid.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
// SALARY#0, NAME#1, DEPT#1. This is done by adding projection with appropriate aliases. | ||
val projectList = realOutput.zip(holder.output).map { case (a1, a2) => | ||
val originalName = holder.exprIdToOriginalName(a2.exprId) | ||
Alias(a1, originalName)(a2.exprId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is originalName
always a2.name
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. a2 is coming from holder.output
that will have aliased names in format subquery_x_col_y
.
Original names are saved into sHolder at the time of it's creation in createScanBuilder
.
Does that answer your question?
@@ -573,6 +701,13 @@ case class ScanBuilderHolder( | |||
var pushedAggregate: Option[Aggregation] = None | |||
|
|||
var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression] | |||
|
|||
var joinedRelations: Seq[DataSourceV2RelationBase] = Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does joinedRelations.isEmpty
indicate isJoinPushed
as false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can reuse joinedRelations.isEmpty
instead of isJoinPushed
. I will do that change.
@cloud-fan Shall we support join pushdown for DSV2 ? |
98350e2
to
ba482f1
Compare
ba482f1
to
2a02d6f
Compare
What changes were proposed in this pull request?
With this PR I am introducing the Join pushdown interface for DSv2 connectors and it's implementation for JDBC connectors.
The interface itself,
SupportsPushDownJoin
has the following API:If
isRightSideCompatibleForJoin
is true, then the join will be tried to be pushed down (it can still fail though).With this implementation, only Inner joins are supported. Left and Right joins should be added as well. Cross joins won't be supported since they can increase the amount of data that is being read.
Also, none of the dialects currently supports the join push down. It is only available for H2 dialect. The join push down capability is guarded by SQLConf
spark.sql.optimizer.datasourceV2JoinPushdown
, JDBC optionpushDownJoin
and JDBC dialect methodsupportsJoin
.For the following JDBC query:
the generated SQL query on spark side would be:
Why are the changes needed?
DSv2 connectors can't push down the join operator.
Does this PR introduce any user-facing change?
This PR itself no since the behaviour is not implemented for any of the connectors (besides H2 which is testing JDBC dialect).
How was this patch tested?
New tests and some local testing with TPCDS queries.
Was this patch authored or co-authored using generative AI tooling?