Skip to content

[SPARK-32748][SQL] Support local property propagation in SubqueryBroadcastExec #29589

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

wzhfy
Copy link
Contributor

@wzhfy wzhfy commented Aug 31, 2020

What changes were proposed in this pull request?

Since SPARK-22590, local property propagation is supported through SQLExecution.withThreadLocalCaptured in both BroadcastExchangeExec and SubqueryExec when computing relationFuture. This pr adds the support in SubqueryBroadcastExec.

Why are the changes needed?

Local property propagation is missed in SubqueryBroadcastExec.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Add a new test.

@wzhfy
Copy link
Contributor Author

wzhfy commented Aug 31, 2020

cc @cloud-fan @maryannxue @ajithme

@SparkQA
Copy link

SparkQA commented Aug 31, 2020

Test build #128072 has finished for PR 29589 at commit 9770032.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 2, 2020

Test build #128173 has finished for PR 29589 at commit 7d28b53.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

try {
SQLConf.get.setConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this really work? It's a static conf. As long as there is an earlier test triggers DPP, this won't work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right.. but it's not because it's a static conf, it's because executionContext is in the SubqueryBroadcastExec object.
This makes it hard to write unit test. Do you have any suggestion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it was the same before: https://github.com/apache/spark/pull/27266/files#diff-6dabaf3c491be73cc0479b1f368d157bR164

Can we move this test to there? Then it's easier to read as these special tests are put together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the advice! Updated.

spark.sparkContext.setLocalProperty(propKey, propValue)
val df = sql(
s"""
|SELECT compare_property_value(f.date_id, '$propKey', '$propValue') as col
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is compare_property_value really evaluated in SubqueryBroadcastExec threads?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IIUC udf created by spark.udf.register is not foldable, so it will be evaluated in tasks. Besides, otherwise TaskContext.get() will get null if it's not run in task I think.

@SparkQA
Copy link

SparkQA commented Sep 3, 2020

Test build #128255 has finished for PR 29589 at commit 1cb65ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128303 has finished for PR 29589 at commit aabc23f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 4, 2020

Test build #128302 has finished for PR 29589 at commit 2f70782.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 04f7f6d Sep 7, 2020
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
Future {
SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
Copy link
Member

@maropu maropu Sep 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks the added test can pass without this fix (I think the comment seems to be related to this issue). Could you check this again? @wzhfy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I'll check

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Copy link
Contributor Author

@wzhfy wzhfy Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu I think you are right. Sorry I misunderstood your comment previously.
Because in SubqueryBroadcastExec.relationFuture, what it really calls is child.executeBroadcast[HashedRelation]().value, it should use "broadcast-exchange" threads.

As a result, it seems that propagation for thread local properties in SubqueryBroadcastExec is not necessary, they will be propagated by broadcast threads anyway.

BTW, the current test case is also not reasonable, since the udf is not evaluated in the broadcasted part. I moved it to a broadcast filter (like below) and found the above problem. Thanks for pointing this out!

  test("SPARK-32748: propagate local properties to dynamic pruning thread") {
    val factTable = "fact_local_prop_dpp"
    val dimTable = "dim_local_prop_dpp"
    val filteringValue = 3

    def checkPropertyValueByUdfResult(
        propKey: String,
        propValue: String,
        expectedResultCount: Int): Unit = {
      spark.sparkContext.setLocalProperty(propKey, propValue)
      val df = sql(
        s"""
           |SELECT f.id
           |FROM $factTable f
           |INNER JOIN $dimTable s
           |ON f.id = s.id
           |AND compare_property_value(s.value, '$propKey', '$propValue') = $filteringValue
          """.stripMargin)

      val subqueryBroadcastSeq = df.queryExecution.executedPlan.flatMap {
        case s: FileSourceScanExec => s.partitionFilters.collect {
          case DynamicPruningExpression(InSubqueryExec(_, b: SubqueryBroadcastExec, _, _)) => b
        }
        case _ => Nil
      }
      assert(subqueryBroadcastSeq.nonEmpty,
        s"Should trigger DPP with a reused broadcast exchange:\n${df.queryExecution}")

      assert(df.collect().length == expectedResultCount)
    }

    withTable(factTable, dimTable) {
      spark.range(10).select($"id", $"id".as("value"))
        .write.partitionBy("id").mode("overwrite").saveAsTable(factTable)
      spark.range(5).select($"id", $"id".as("value"))
        .write.mode("overwrite").saveAsTable(dimTable)

      withSQLConf(
        StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD.key -> "1",
        SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {

        try {
          spark.udf.register(
            "compare_property_value",
            (input: Int, propKey: String, propValue: String) => {
              if (TaskContext.get().getLocalProperty(propKey) == propValue) {
                filteringValue
              } else {
                input
              }
            }
          )
          val propKey = "spark.sql.subquery.broadcast.prop.key"

          // set local property and assert
          val propValue1 = UUID.randomUUID().toString()
          checkPropertyValueByUdfResult(propKey, propValue1, expectedResultCount = 5)

          // change local property and re-assert
          val propValue2 = UUID.randomUUID().toString()
          checkPropertyValueByUdfResult(propKey, propValue2, expectedResultCount = 5)
        } finally {
          spark.sessionState.catalog.dropTempFunction("compare_property_value", true)
        }
      }
    }
  }

Copy link
Contributor Author

@wzhfy wzhfy Sep 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu @cloud-fan So should we revert this pr? And raise a separate jira for configuring the number of subquery broadcast threads (maybe use a different config name other than StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)?

object SubqueryBroadcastExec {
  private[execution] val executionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("dynamic-pruning",
      SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD)))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, then please revert it.

I don't think we need a separate config. The thread pool here is used to wait for the broadcast to finish, seems better to have same number of slots compared to broadcast exchange thread pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a separate config. The thread pool here is used to wait for the broadcast to finish, seems better to have same number of slots compared to broadcast exchange thread pool.

@cloud-fan The previous number of subquery broadcast threads is 16, while the default number of broadcast exchange threads is 128.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's open a new PR to change the number of threads and discuss there. 16 threads probably are fine, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we can set this flag.

@wzhfy @maropu @cloud-fan sorry if I am late to the party. We should consider keeping this. It is generally good practice to use a narrow waist for these things in the code. It will save someone in the future some time debugging why his change is not working :).

maropu pushed a commit that referenced this pull request Sep 8, 2020
…ueryBroadcastExec"

### What changes were proposed in this pull request?

This reverts commit 04f7f6d due to the discussion in [comment](#29589 (comment)).

### Why are the changes needed?

Based on  the discussion in [comment](#29589 (comment)), propagation for thread local properties in `SubqueryBroadcastExec` is not necessary, since they will be propagated by broadcast exchange threads anyway.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Also revert the added test.

Closes #29674 from wzhfy/revert_dpp_thread_local.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants