Skip to content

[SPARK-52187][SQL] Introduce Join pushdown for DSv2 #50921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

PetarVasiljevic-DB
Copy link
Contributor

@PetarVasiljevic-DB PetarVasiljevic-DB commented May 16, 2025

What changes were proposed in this pull request?

With this PR I am introducing the Join pushdown interface for DSv2 connectors and it's implementation for JDBC connectors.

The interface itself, SupportsPushDownJoin has the following API:

public interface SupportsPushDownJoin extends ScanBuilder {
    boolean isRightSideCompatibleForJoin(SupportsPushDownJoin other);

    boolean pushJoin(
            SupportsPushDownJoin other,
            JoinType joinType,
            Optional<Predicate> condition,
            StructType leftRequiredSchema,
            StructType rightRequiredSchema
    );
}

If isRightSideCompatibleForJoin is true, then the join will be tried to be pushed down (it can still fail though).

With this implementation, only Inner joins are supported. Left and Right joins should be added as well. Cross joins won't be supported since they can increase the amount of data that is being read.

Also, none of the dialects currently supports the join push down. It is only available for H2 dialect. The join push down capability is guarded by SQLConf spark.sql.optimizer.datasourceV2JoinPushdown, JDBC option pushDownJoin and JDBC dialect method supportsJoin.

For the following JDBC query:

SELECT 
    p.cc_call_center_id, 
    q.cc_call_center_id, 
    q.cc_city 
FROM 
    call_center p 
    JOIN 
    call_center q 
    ON p.cc_call_center_id = q.cc_call_center_id

the generated SQL query on spark side would be:

SELECT 
    "subquery_176_col_0",
    "subquery_176_col_1",
    "subquery_176_col_2" FROM (
    SELECT 
        "subquery_174"."cc_call_center_id" AS "subquery_176_col_0",
        "subquery_175"."cc_call_center_id" AS "subquery_176_col_1",
        "subquery_175"."cc_city" AS "subquery_176_col_2" 
        FROM (
            SELECT "cc_call_center_sk", ... FROM "CALL_CENTER"  WHERE ("cc_call_center_id" IS NOT NULL)  
        )"subquery_174"
        INNER JOIN (
            SELECT "cc_call_center_sk", ... FROM "CALL_CENTER"  WHERE ("cc_call_center_id" IS NOT NULL)  
        ) "subquery_175"
        ON "subquery_174"."cc_call_center_id" = "subquery_175"."cc_call_center_id"
) SPARK_GEN_SUBQ_114

Why are the changes needed?

DSv2 connectors can't push down the join operator.

Does this PR introduce any user-facing change?

This PR itself no since the behaviour is not implemented for any of the connectors (besides H2 which is testing JDBC dialect).

How was this patch tested?

New tests and some local testing with TPCDS queries.

Was this patch authored or co-authored using generative AI tooling?

@github-actions github-actions bot added the SQL label May 16, 2025
@PetarVasiljevic-DB PetarVasiljevic-DB changed the title introduce join pushdown for dsv2 [SPARK-52187] Introduce Join pushdown for DSv2 May 16, 2025
@HyukjinKwon HyukjinKwon changed the title [SPARK-52187] Introduce Join pushdown for DSv2 [SPARK-52187][SQL] Introduce Join pushdown for DSv2 May 19, 2025
}

public String[] qualifier;
public String name;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to separate qualifier and name? I think JoinColumn should be the same as NamedReference with an additional isInLeftSideOfJoin flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can have similar implementation as FieldReference where we would have parts and isInLeftSideOfJoin fields, but I find the separation between qualifier and name nicer because it makes code cleaner in some way.

If you take a look at JDBCScanBuilder.pushJoin we are passing the condition that contains JoinColumns as leaf expressions, but these are not yet qualified. I am qualifying these later on, in qualifyCondition method.

Without qualifier-name separation, and with parts:Seq[String] I would need to do array shifting, which is fine but I just find it nicer my way.

I can however change the implementation of JoinColumn if to be something like:

private[sql] final case class JoinColumn(
    parts: Seq[String],
    isInLeftSideOfJoin: Boolean)
  extends NamedReference {
  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
  override def fieldNames(): Array[String] = parts.toArray
}

Honestly, I am fine with both

import java.util.Map;

/**
* The builder to generate SQL for specific Join type.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this is really needed. The join type string is quite simple, and Spark doesn't need to provide a helper to do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be redundant. The reason why I have it is simply the answer to the following question: what if some dialect calls the specific join type differently. For example, what if there is a dialect that doesn't support CROSS JOIN but only JOIN syntax.

We can get same effect with just string comparison in the dialects, so we can get rid of it if you find it as an overkill.

@@ -174,6 +178,12 @@ protected String visitNamedReference(NamedReference namedRef) {
return namedRef.toString();
}

protected String visitJoinColumn(JoinColumn column) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fail by default? the implementations must provide the left/right side alias as a context, in order to generate the column name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the implementations must provide the left/right side alias as a context, in order to generate the column name.

not really.. The way I designed this is that you are already going to have left/right side alias in JoinColumn before visiting it. So I think this implementation is valid.

// SALARY#0, NAME#1, DEPT#1. This is done by adding projection with appropriate aliases.
val projectList = realOutput.zip(holder.output).map { case (a1, a2) =>
val originalName = holder.exprIdToOriginalName(a2.exprId)
Alias(a1, originalName)(a2.exprId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is originalName always a2.name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. a2 is coming from holder.output that will have aliased names in format subquery_x_col_y.

Original names are saved into sHolder at the time of it's creation in createScanBuilder.

Does that answer your question?

@@ -573,6 +701,13 @@ case class ScanBuilderHolder(
var pushedAggregate: Option[Aggregation] = None

var pushedAggOutputMap: AttributeMap[Expression] = AttributeMap.empty[Expression]

var joinedRelations: Seq[DataSourceV2RelationBase] = Seq()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does joinedRelations.isEmpty indicate isJoinPushed as false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can reuse joinedRelations.isEmpty instead of isJoinPushed. I will do that change.

@beliefer
Copy link
Contributor

@cloud-fan Shall we support join pushdown for DSV2 ?

@PetarVasiljevic-DB PetarVasiljevic-DB force-pushed the support_join_for_dsv2 branch 2 times, most recently from 98350e2 to ba482f1 Compare May 20, 2025 15:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants