Skip to content

Commit 13cfc5b

Browse files
stczwdjackylee-ch
authored andcommitted
[SPARK-37831][CORE] add task partition id in TaskInfo and Task Metrics
### Why are the changes needed? There is no partition id in the current task metrics. It is difficult to track the task metrics of a specific partition or the stage metrics of processing data, especially when the stage was retried. ``` class TaskData private[spark]( val taskId: Long, val index: Int, val attempt: Int, val launchTime: Date, val resultFetchStart: Option[Date], JsonDeserialize(contentAs = classOf[JLong]) val duration: Option[Long], val executorId: String, val host: String, val status: String, val taskLocality: String, val speculative: Boolean, val accumulatorUpdates: Seq[AccumulableInfo], val errorMessage: Option[String] = None, val taskMetrics: Option[TaskMetrics] = None, val executorLogs: Map[String, String], val schedulerDelay: Long, val gettingResultTime: Long) ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add new tests Closes #35185 from jackylee-ch/SPARK-37831. Lead-authored-by: stczwd <qcsd2011@163.com> Co-authored-by: Jacky Lee <qcsd2011@gmail.com> Co-authored-by: jackylee-ch <lijunqing@baidu.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent a40acd4 commit 13cfc5b

File tree

49 files changed

+1581
-47
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1581
-47
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,34 @@ class TaskInfo(
3535
*/
3636
val index: Int,
3737
val attemptNumber: Int,
38+
/**
39+
* The actual RDD partition ID in this task.
40+
* The ID of the RDD partition is always same across task attempts.
41+
* This will be -1 for historical data, and available for all applications since Spark 3.3.
42+
*/
43+
val partitionId: Int,
3844
val launchTime: Long,
3945
val executorId: String,
4046
val host: String,
4147
val taskLocality: TaskLocality.TaskLocality,
4248
val speculative: Boolean) {
4349

50+
/**
51+
* This api doesn't contains partitionId, please use the new api.
52+
* Remain it for backward compatibility before Spark 3.3.
53+
*/
54+
def this(
55+
taskId: Long,
56+
index: Int,
57+
attemptNumber: Int,
58+
launchTime: Long,
59+
executorId: String,
60+
host: String,
61+
taskLocality: TaskLocality.TaskLocality,
62+
speculative: Boolean) = {
63+
this(taskId, index, attemptNumber, -1, launchTime, executorId, host, taskLocality, speculative)
64+
}
65+
4466
/**
4567
* The time when the task started remotely getting the result. Will not be set if the
4668
* task result was sent immediately when the task finished (as opposed to sending an

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,8 @@ private[spark] class TaskSetManager(
509509
// Do various bookkeeping
510510
copiesRunning(index) += 1
511511
val attemptNum = taskAttempts(index).size
512-
val info = new TaskInfo(taskId, index, attemptNum, launchTime,
512+
val info = new TaskInfo(
513+
taskId, index, attemptNum, task.partitionId, launchTime,
513514
execId, host, taskLocality, speculative)
514515
taskInfos(taskId) = info
515516
taskAttempts(index) = info :: taskAttempts(index)

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,8 @@ private[spark] class AppStatusStore(
743743
})
744744

745745
new v1.TaskData(taskDataOld.taskId, taskDataOld.index,
746-
taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart,
746+
taskDataOld.attempt, taskDataOld.partitionId,
747+
taskDataOld.launchTime, taskDataOld.resultFetchStart,
747748
taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status,
748749
taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates,
749750
taskDataOld.errorMessage, taskDataOld.taskMetrics,

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ private class LiveTask(
201201
info.taskId,
202202
info.index,
203203
info.attemptNumber,
204+
info.partitionId,
204205
info.launchTime,
205206
if (info.gettingResult) info.gettingResultTime else -1L,
206207
duration,

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ class TaskData private[spark](
308308
val taskId: Long,
309309
val index: Int,
310310
val attempt: Int,
311+
val partitionId: Int,
311312
val launchTime: Date,
312313
val resultFetchStart: Option[Date],
313314
@JsonDeserialize(contentAs = classOf[JLong])

core/src/main/scala/org/apache/spark/status/storeTypes.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ private[spark] object TaskIndexNames {
141141
final val STAGE = "stage"
142142
final val STATUS = "sta"
143143
final val TASK_INDEX = "idx"
144+
final val TASK_PARTITION_ID = "partid"
144145
final val COMPLETION_TIME = "ct"
145146
}
146147

@@ -161,6 +162,10 @@ private[spark] class TaskDataWrapper(
161162
val index: Int,
162163
@KVIndexParam(value = TaskIndexNames.ATTEMPT, parent = TaskIndexNames.STAGE)
163164
val attempt: Int,
165+
@KVIndexParam(value = TaskIndexNames.TASK_PARTITION_ID, parent = TaskIndexNames.STAGE)
166+
// Different kvstores have different default values (eg 0 or -1),
167+
// thus we use the default value here for backwards compatibility.
168+
val partitionId: Int = -1,
164169
@KVIndexParam(value = TaskIndexNames.LAUNCH_TIME, parent = TaskIndexNames.STAGE)
165170
val launchTime: Long,
166171
val resultFetchStart: Long,
@@ -286,6 +291,7 @@ private[spark] class TaskDataWrapper(
286291
taskId,
287292
index,
288293
attempt,
294+
partitionId,
289295
new Date(launchTime),
290296
if (resultFetchStart > 0L) Some(new Date(resultFetchStart)) else None,
291297
if (duration > 0L) Some(duration) else None,

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ private[spark] object JsonProtocol {
317317
("Task ID" -> taskInfo.taskId) ~
318318
("Index" -> taskInfo.index) ~
319319
("Attempt" -> taskInfo.attemptNumber) ~
320+
("Partition ID" -> taskInfo.partitionId) ~
320321
("Launch Time" -> taskInfo.launchTime) ~
321322
("Executor ID" -> taskInfo.executorId) ~
322323
("Host" -> taskInfo.host) ~
@@ -916,6 +917,7 @@ private[spark] object JsonProtocol {
916917
val taskId = (json \ "Task ID").extract[Long]
917918
val index = (json \ "Index").extract[Int]
918919
val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
920+
val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
919921
val launchTime = (json \ "Launch Time").extract[Long]
920922
val executorId = weakIntern((json \ "Executor ID").extract[String])
921923
val host = weakIntern((json \ "Host").extract[String])
@@ -930,8 +932,9 @@ private[spark] object JsonProtocol {
930932
case None => Seq.empty[AccumulableInfo]
931933
}
932934

933-
val taskInfo =
934-
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
935+
val taskInfo = new TaskInfo(
936+
taskId, index, attempt, partitionId, launchTime,
937+
executorId, host, taskLocality, speculative)
935938
taskInfo.gettingResultTime = gettingResultTime
936939
taskInfo.finishTime = finishTime
937940
taskInfo.failed = failed

core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
[ {
2+
"id" : "local-1642039451826",
3+
"name" : "Spark shell",
4+
"attempts" : [ {
5+
"startTime" : "2022-01-13T02:04:10.519GMT",
6+
"endTime" : "2022-01-13T02:05:36.564GMT",
7+
"lastUpdated" : "",
8+
"duration" : 86045,
9+
"sparkUser" : "lijunqing",
10+
"completed" : true,
11+
"appSparkVersion" : "3.3.0-SNAPSHOT",
12+
"startTimeEpoch" : 1642039450519,
13+
"endTimeEpoch" : 1642039536564,
14+
"lastUpdatedEpoch" : 0
15+
} ]
16+
}, {
217
"id" : "application_1628109047826_1317105",
318
"name" : "Spark shell",
419
"attempts" : [ {

core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
[ {
2+
"id" : "local-1642039451826",
3+
"name" : "Spark shell",
4+
"attempts" : [ {
5+
"startTime" : "2022-01-13T02:04:10.519GMT",
6+
"endTime" : "2022-01-13T02:05:36.564GMT",
7+
"lastUpdated" : "",
8+
"duration" : 86045,
9+
"sparkUser" : "lijunqing",
10+
"completed" : true,
11+
"appSparkVersion" : "3.3.0-SNAPSHOT",
12+
"startTimeEpoch" : 1642039450519,
13+
"endTimeEpoch" : 1642039536564,
14+
"lastUpdatedEpoch" : 0
15+
} ]
16+
}, {
217
"id" : "application_1628109047826_1317105",
318
"name" : "Spark shell",
419
"attempts" : [ {

core/src/test/resources/HistoryServerExpectations/excludeOnFailure_for_stage_expectation.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
"taskId" : 0,
4747
"index" : 0,
4848
"attempt" : 0,
49+
"partitionId": -1,
4950
"launchTime" : "2018-01-09T10:21:18.347GMT",
5051
"duration" : 562,
5152
"executorId" : "0",
@@ -100,6 +101,7 @@
100101
"taskId" : 5,
101102
"index" : 3,
102103
"attempt" : 0,
104+
"partitionId": -1,
103105
"launchTime" : "2018-01-09T10:21:18.958GMT",
104106
"duration" : 22,
105107
"executorId" : "1",
@@ -153,6 +155,7 @@
153155
"taskId" : 10,
154156
"index" : 8,
155157
"attempt" : 0,
158+
"partitionId": -1,
156159
"launchTime" : "2018-01-09T10:21:19.034GMT",
157160
"duration" : 12,
158161
"executorId" : "1",
@@ -206,6 +209,7 @@
206209
"taskId" : 1,
207210
"index" : 1,
208211
"attempt" : 0,
212+
"partitionId": -1,
209213
"launchTime" : "2018-01-09T10:21:18.364GMT",
210214
"duration" : 565,
211215
"executorId" : "1",
@@ -259,6 +263,7 @@
259263
"taskId" : 6,
260264
"index" : 4,
261265
"attempt" : 0,
266+
"partitionId": -1,
262267
"launchTime" : "2018-01-09T10:21:18.980GMT",
263268
"duration" : 16,
264269
"executorId" : "1",
@@ -312,6 +317,7 @@
312317
"taskId" : 9,
313318
"index" : 7,
314319
"attempt" : 0,
320+
"partitionId": -1,
315321
"launchTime" : "2018-01-09T10:21:19.022GMT",
316322
"duration" : 12,
317323
"executorId" : "1",
@@ -365,6 +371,7 @@
365371
"taskId" : 2,
366372
"index" : 2,
367373
"attempt" : 0,
374+
"partitionId": -1,
368375
"launchTime" : "2018-01-09T10:21:18.899GMT",
369376
"duration" : 27,
370377
"executorId" : "0",
@@ -419,6 +426,7 @@
419426
"taskId" : 7,
420427
"index" : 5,
421428
"attempt" : 0,
429+
"partitionId": -1,
422430
"launchTime" : "2018-01-09T10:21:18.996GMT",
423431
"duration" : 15,
424432
"executorId" : "1",
@@ -472,6 +480,7 @@
472480
"taskId" : 3,
473481
"index" : 0,
474482
"attempt" : 1,
483+
"partitionId": -1,
475484
"launchTime" : "2018-01-09T10:21:18.919GMT",
476485
"duration" : 24,
477486
"executorId" : "1",
@@ -525,6 +534,7 @@
525534
"taskId" : 11,
526535
"index" : 9,
527536
"attempt" : 0,
537+
"partitionId": -1,
528538
"launchTime" : "2018-01-09T10:21:19.045GMT",
529539
"duration" : 15,
530540
"executorId" : "1",
@@ -578,6 +588,7 @@
578588
"taskId" : 8,
579589
"index" : 6,
580590
"attempt" : 0,
591+
"partitionId": -1,
581592
"launchTime" : "2018-01-09T10:21:19.011GMT",
582593
"duration" : 11,
583594
"executorId" : "1",
@@ -631,6 +642,7 @@
631642
"taskId" : 4,
632643
"index" : 2,
633644
"attempt" : 1,
645+
"partitionId": -1,
634646
"launchTime" : "2018-01-09T10:21:18.943GMT",
635647
"duration" : 16,
636648
"executorId" : "1",

0 commit comments

Comments
 (0)