Skip to content

Commit c4ce3c4

Browse files
alarxin
authored andcommitted
[SC-5985] Add spark.databricks.debug.taskKiller.minOutputRows config.
## What changes were proposed in this pull request? A configuration parameter spark.databricks.debug.taskKiller.minOutputRows is added. It sets the minimum required number of records that need to be produced at some point in task execution, before the task can be terminated by DatabricksTaskDebugListener. ## How was this patch tested? Adds unit tests. Author: Ala Luszczak <ala@databricks.com> Closes apache#250 from ala/min-output-rows.
1 parent 96022f2 commit c4ce3c4

File tree

3 files changed

+56
-15
lines changed

3 files changed

+56
-15
lines changed

sql/core/src/main/scala/com/databricks/sql/DatabricksSQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,13 @@ object DatabricksSQLConf {
114114
.longConf
115115
.createWithDefault(10L)
116116

117+
val TASK_KILLER_MIN_OUTPUT_ROWS = buildConf("spark.databricks.debug.taskKiller.minOutputRows")
118+
.internal()
119+
.doc("The minimum number of rows that need to be produced by the task before it can be " +
120+
"cancelled.")
121+
.longConf
122+
.createWithDefault(1000L * 1000L)
123+
117124
val TASK_KILLER_ERROR_MESSAGE = buildStaticConf("spark.databricks.debug.taskKiller.message")
118125
.internal()
119126
.doc("The error message to displayed when a task is terminated by DatabricksTaskDebugListener.")

sql/core/src/main/scala/com/databricks/sql/debugger/DatabricksTaskDebugListener.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,25 +78,28 @@ class DatabricksTaskDebugListener(
7878
}
7979
}
8080

81-
val outputRatio = if (recordsIn > 0) recordsOut / recordsIn else 0
82-
checkOutputRatio(outputRatio)
81+
checkOutputRatio(recordsIn, recordsOut)
8382
}
8483

8584
/**
8685
* Compare running time and output ratio and with the configured limits.
8786
* If needed, request task cancellation.
8887
*/
89-
private def checkOutputRatio(outputRatio: Long): Unit = {
88+
private def checkOutputRatio(recordsIn: Long, recordsOut: Long): Unit = {
89+
val outputRatio = if (recordsIn > 0) recordsOut / recordsIn else 0
90+
9091
val queryExecution = SQLExecution.getQueryExecution(executionId)
9192
if (!cancelRequestIssued || launchTime > 0 || queryExecution != null) {
9293
val minRunningTimeSec = queryExecution.sparkSession.sessionState.conf.getConf(
9394
DatabricksSQLConf.TASK_KILLER_MIN_TIME)
95+
val minOutputRows = queryExecution.sparkSession.sessionState.conf.getConf(
96+
DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS)
9497
val outputRatioKillThreshold = queryExecution.sparkSession.sessionState.conf.getConf(
9598
DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD)
9699
val runningTimeSec = (System.currentTimeMillis() - launchTime) / 1000
97100

98-
if (runningTimeSec > minRunningTimeSec && outputRatioKillThreshold > 0 &&
99-
outputRatio > outputRatioKillThreshold) {
101+
if (runningTimeSec > minRunningTimeSec && recordsOut >= minOutputRows &&
102+
outputRatioKillThreshold > 0 && outputRatio > outputRatioKillThreshold) {
100103
val errorMsgTemplate = queryExecution.sparkSession.sessionState.conf.getConf(
101104
DatabricksSQLConf.TASK_KILLER_ERROR_MESSAGE)
102105
terminateTask(outputRatio, outputRatioKillThreshold, errorMsgTemplate)

sql/core/src/test/scala/com/databricks/sql/debugger/DatabricksTaskDebugListenerSuite.scala

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import com.databricks.sql.DatabricksSQLConf
1616
import org.scalatest.concurrent.Eventually
1717

1818
import org.apache.spark.SparkException
19-
import org.apache.spark.sql.QueryTest
19+
import org.apache.spark.sql.{DataFrame, QueryTest}
2020
import org.apache.spark.sql.functions._
2121
import org.apache.spark.sql.internal.SQLConf
2222
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -27,25 +27,33 @@ class DatabricksTaskDebugListenerSuite
2727
with SQLTestUtils
2828
with Eventually {
2929

30+
import testImplicits._
31+
3032
val CART_PROD_INPUT_SIZE = 100000L
31-
var prevKillerOutputRatioThreshold = 0L
33+
var prevKillerOutputRatio = 0L
3234
var prevKillerMinTime = 0L
35+
var prevMinOutputRows = 0L
3336

3437
protected override def beforeAll(): Unit = {
3538
super.beforeAll()
3639

37-
prevKillerOutputRatioThreshold = spark.sessionState.conf.getConf(
38-
DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD)
39-
prevKillerMinTime = spark.sessionState.conf.getConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME)
40+
val conf = spark.sessionState.conf
41+
42+
prevKillerOutputRatio = conf.getConf(DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD)
43+
prevKillerMinTime = conf.getConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME)
44+
prevMinOutputRows = conf.getConf(DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS)
4045

41-
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD, 100L)
42-
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME, 5L)
46+
conf.setConf(DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD, 100L)
47+
conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME, 5L)
48+
conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS, 1000L)
4349
}
4450

4551
protected override def afterAll(): Unit = {
46-
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD,
47-
prevKillerOutputRatioThreshold)
48-
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME, prevKillerMinTime)
52+
val conf = spark.sessionState.conf
53+
conf.setConf(DatabricksSQLConf.TASK_KILLER_OUTPUT_RATIO_THRESHOLD, prevKillerOutputRatio)
54+
conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_TIME, prevKillerMinTime)
55+
conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS, prevMinOutputRows)
56+
4957
super.afterAll()
5058
}
5159

@@ -112,4 +120,27 @@ class DatabricksTaskDebugListenerSuite
112120
}
113121
}
114122
}
123+
124+
// Create a query that takes ca. 20 seconds to process (because heartbeats with metrics are send
125+
// around 10 seconds apart), but doesn't produce too much output (max 250,000 records) at any
126+
// point in execution.
127+
def gen20SecQuery: DataFrame = {
128+
spark.range(200L).repartition(1).map { x =>
129+
// Trickle out 10 rows per second
130+
Thread.sleep(100)
131+
x
132+
}.crossJoin(spark.range(1000L)).toDF("a", "b").agg(sum("a"), sum("b"))
133+
}
134+
135+
test("spark.databricks.debug.taskKiller.minOutputRows = 1000,000 - query is not killed") {
136+
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS, 1000L * 1000L)
137+
gen20SecQuery.collect()
138+
}
139+
140+
test("spark.databricks.debug.taskKiller.minOutputRows = 1000 - the same query is terminated") {
141+
spark.sessionState.conf.setConf(DatabricksSQLConf.TASK_KILLER_MIN_OUTPUT_ROWS, 1000L)
142+
testTaskTermination {
143+
gen20SecQuery.collect()
144+
}
145+
}
115146
}

0 commit comments

Comments
 (0)