Skip to content

Commit b55563c

Browse files
alarxin
authored andcommitted
[SPARK-19607] Finding QueryExecution that matches provided executionId
## What changes were proposed in this pull request? Implementing a mapping between executionId and corresponding QueryExecution in SQLExecution. ## How was this patch tested? Adds a unit test. Author: Ala Luszczak <ala@databricks.com> Closes #16940 from ala/execution-id.
1 parent 3973403 commit b55563c

File tree

2 files changed

+41
-0
lines changed

2 files changed

+41
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.util.concurrent.ConcurrentHashMap
2021
import java.util.concurrent.atomic.AtomicLong
2122

2223
import org.apache.spark.SparkContext
@@ -32,6 +33,12 @@ object SQLExecution {
3233

3334
private def nextExecutionId: Long = _nextExecutionId.getAndIncrement
3435

36+
private val executionIdToQueryExecution = new ConcurrentHashMap[Long, QueryExecution]()
37+
38+
def getQueryExecution(executionId: Long): QueryExecution = {
39+
executionIdToQueryExecution.get(executionId)
40+
}
41+
3542
/**
3643
* Wrap an action that will execute "queryExecution" to track all Spark jobs in the body so that
3744
* we can connect them with an execution.
@@ -44,6 +51,7 @@ object SQLExecution {
4451
if (oldExecutionId == null) {
4552
val executionId = SQLExecution.nextExecutionId
4653
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
54+
executionIdToQueryExecution.put(executionId, queryExecution)
4755
val r = try {
4856
// sparkContext.getCallSite() would first try to pick up any call site that was previously
4957
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
@@ -60,6 +68,7 @@ object SQLExecution {
6068
executionId, System.currentTimeMillis()))
6169
}
6270
} finally {
71+
executionIdToQueryExecution.remove(executionId)
6372
sc.setLocalProperty(EXECUTION_ID_KEY, null)
6473
}
6574
r

sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
2020
import java.util.Properties
2121

2222
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
23+
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2324
import org.apache.spark.sql.SparkSession
2425

2526
class SQLExecutionSuite extends SparkFunSuite {
@@ -102,6 +103,33 @@ class SQLExecutionSuite extends SparkFunSuite {
102103
}
103104
}
104105

106+
107+
test("Finding QueryExecution for given executionId") {
108+
val spark = SparkSession.builder.master("local[*]").appName("test").getOrCreate()
109+
import spark.implicits._
110+
111+
var queryExecution: QueryExecution = null
112+
113+
spark.sparkContext.addSparkListener(new SparkListener {
114+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
115+
val executionIdStr = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
116+
if (executionIdStr != null) {
117+
queryExecution = SQLExecution.getQueryExecution(executionIdStr.toLong)
118+
}
119+
SQLExecutionSuite.canProgress = true
120+
}
121+
})
122+
123+
val df = spark.range(1).map { x =>
124+
while (!SQLExecutionSuite.canProgress) {
125+
Thread.sleep(1)
126+
}
127+
x
128+
}
129+
df.collect()
130+
131+
assert(df.queryExecution === queryExecution)
132+
}
105133
}
106134

107135
/**
@@ -114,3 +142,7 @@ private class BadSparkContext(conf: SparkConf) extends SparkContext(conf) {
114142
override protected def initialValue(): Properties = new Properties()
115143
}
116144
}
145+
146+
object SQLExecutionSuite {
147+
@volatile var canProgress = false
148+
}

0 commit comments

Comments
 (0)