Skip to content

Conversation

@jackylee-ch
Copy link
Contributor

Why are the changes needed?

There is no partition id in the current task metrics. It is difficult to track the task metrics of a specific partition or the stage metrics of processing data, especially when the stage was retried.

class TaskData private[spark](
    val taskId: Long,
    val index: Int,
    val attempt: Int,
    val launchTime: Date,
    val resultFetchStart: Option[Date],
    @JsonDeserialize(contentAs = classOf[JLong])
    val duration: Option[Long],
    val executorId: String,
    val host: String,
    val status: String,
    val taskLocality: String,
    val speculative: Boolean,
    val accumulatorUpdates: Seq[AccumulableInfo],
    val errorMessage: Option[String] = None,
    val taskMetrics: Option[TaskMetrics] = None,
    val executorLogs: Map[String, String],
    val schedulerDelay: Long,
    val gettingResultTime: Long) 

Does this PR introduce any user-facing change?

no

How was this patch tested?

add new tests

@github-actions github-actions bot added the CORE label Jan 13, 2022
@github-actions github-actions bot added the BUILD label Jan 13, 2022
@mridulm
Copy link
Contributor

mridulm commented Jan 13, 2022

For the specific purpose detailed, use index ?
Scratch that - misread the description.

@mridulm
Copy link
Contributor

mridulm commented Jan 13, 2022

+CC @tgravescs, @Ngone51
I am surprised this is an issue ... want to make sure I am not missing something here !

@jackylee-ch
Copy link
Contributor Author

The main purpose of this issue is to track the task metrics of a specific RDD partition, espetially for input/shuffle reads or output/shuffle writes.
It is very useful when shuffle data diff is found, we can use this to check data computing.
Also, it will be helpful to show the actual computing data size or count in SQL UI.

@HyukjinKwon
Copy link
Member

FWIW, there's a bug on updating the status in the check (#35179 (comment)). The tests in this PR looks passed (https://github.com/stczwd/spark/runs/4799251925).

@jackylee-ch jackylee-ch changed the title [SPARK-37831][CORE][WIP] add task partition id in TaskInfo and Task Metrics [SPARK-37831][CORE] add task partition id in TaskInfo and Task Metrics Jan 13, 2022
@jackylee-ch
Copy link
Contributor Author

cc @dongjoon-hyun @srowen @cloud-fan

@tgravescs
Copy link
Contributor

yeah generally index = partition id on the first execution but doesn't work on retries. I remember running into this a few times but was never a huge issue, seems like a good thing to add

@mridulm
Copy link
Contributor

mridulm commented Jan 13, 2022

@tgravescs Agree, for the first stage attempt this should be fine.
But for any stage attempt which is computing a subset of tasks (retries, specific partition computation via toLocalIterator, etc), this is a concern.

An interesting question would be how to handle this for skewed joins - thoughts @cloud-fan ?

Tagged folks since I wanted to make sure I was not missing something here :-)

@mridulm
Copy link
Contributor

mridulm commented Jan 13, 2022

This does look like an important missing functionality, thanks for identifying and working on it @stczwd ! Given size of the PR, I will try to circle back to it over the weekend.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took an initial pass through the PR and added some comments - overall looks good.
We would need to make sure that skew join and partition coalescing in SQL interact well with this change.

index,
attempt,
partitionId,
new Date(launchTime),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+CC @thejdeep, please take a look to make sure there are no backwardly incompatible changes.
Existing event files and/or level db's should be readable after this change.

Copy link
Contributor Author

@jackylee-ch jackylee-ch Jan 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have test it with event files, leveldb and rocksdb, and it is ok with rocksdb and event files, which retruns "partitionId" : -1,. But when testing leveldb, we get "partitionId" : 0,, which is different from what we expected.

BTW, the test scenario is generating eventLog and leveldb/rocksdb with old spark-client and reading it on new spark-client which has this pr.

Copy link
Contributor

@mridulm mridulm Jan 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to explore how we can default the value to -1 if missing.
Else cleaning up all level-db would be a prerequisite to deploying this.

I am not sure what is the cleanest way to enforce incompatible leveldb format changes - IIRC you had made some changes recently which require regen, right @dongjoon-hyun ? Any suggestions ? (IIRC we did not bump up the store version)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun Any point bout this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double check again? @jackylee-ch .

@Ngone51 Thanks for your double check. It works fun while reading old eventLog json files with new spark-client. The recurring scenario of this problem is that using the new spark-client read old leveldb data.

"7" : {
      "taskId" : 7,
      "index" : 1,
      "attempt" : 0,
      "partitionId" : 0,
      ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you give the reproduce steps? @jackylee-ch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Firstly, use old spark-client to generate eventLog and leveldb with these configs;

spark.history.fs.logDirectory       /tmp/spark-client/spark-old/eventLog
spark.history.store.path            /tmp/spark-client/spark-old/kvstore
spark.history.store.hybridStore.diskBackend     LEVELDB

Then, use new spark-client to read eventLog and level db with the configs in step 1;

These are the reproduce steps and configs that I have used. @Ngone51

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have reproduced the issue with the above steps. And now I see where the problem is. Basically, the fix in JsonProtocol only solves the compatible issue when reading event files. But, when reading from LeveDB, there's no compatible conversion for it. I tried to set the partitionId of TaskDataWrapper with the default value -1 and seems to work. Could you double-check? @jackylee-ch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for your advise. It works fun with default value. I will add the default value -1 and some comments to it.

"attempt" : 1,
"partitionId": -1,
"launchTime" : "2018-01-09T10:21:18.943GMT",
"duration" : 16,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we expect this to default to -1, revert changes to this (and similar other) file ?
Similarly for Partition Id = -1 from json files as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These json file is generated by rest api. Unless we do not want to show partitionId with history data, this still needs to be modified.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to keeping it as-is.
We need to validate that missing value defaults to -1 (for both json and level db) - to test upgrade case (this is dependent on Dongjoon's suggestions above on how to mitigate the issue though).

@jackylee-ch
Copy link
Contributor Author

Took an initial pass through the PR and added some comments - overall looks good. We would need to make sure that skew join and partition coalescing in SQL interact well with this change.

Thanks for you reply. I have test partition coalescing in SQL interact, it works well with this change.

@mridulm
Copy link
Contributor

mridulm commented Jan 18, 2022

Took an initial pass through the PR and added some comments - overall looks good. We would need to make sure that skew join and partition coalescing in SQL interact well with this change.

Thanks for you reply. I have test partition coalescing in SQL interact, it works well with this change.

What I want @cloud-fan, @dongjoon-hyun, etc who are more familiar with SQL to look at is - given a single partition gets computed by multiple tasks (for skew), or multiple partitions are getting computed by single task (for coalascing) what is the expectation (between the 'original' reducer stage, and executed reducer stage) Also whether this is compatible with future evolution of sql.

@jackylee-ch
Copy link
Contributor Author

kindly ping @cloud-fan @HyukjinKwon @dongjoon-hyun @rdblue

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good to me.

@jackylee-ch
Copy link
Contributor Author

kindly remind @cloud-fan @HyukjinKwon @dongjoon-hyun
Any more question about this pr?

@tgravescs
Copy link
Contributor

It would be good to make sure we test backwards compatibility on the history server. ie use the old history server to read newer event log with this change.

@jackylee-ch
Copy link
Contributor Author

It would be good to make sure we test backwards compatibility on the history server. ie use the old history server to read newer event log with this change.

Thanks for your attension. Yeap, we have already test it with old history server and new server while using a server generate eventLog and read it from another server, they all works fine with this change. Besides, we have already used it in our environment, which has different versions of servers, especially spark2.x.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of minor comments.
I wanted to confirm whether we have tests where task index != partitionId - for example stage resubmission case.

@jackylee-ch
Copy link
Contributor Author

I wanted to confirm whether we have tests where task index != partitionId - for example stage resubmission case.

Yes, we have already test this. Here ia a simple example for stage recomputation with FetchFailed error.

{
  "status" : "FAILED",
  "stageId" : 39,
  "attemptId" : 1,
  "numTasks" : 407,
  "numActiveTasks" : 0,
  "numCompleteTasks" : 155,
  "numFailedTasks" : 77,
  "numKilledTasks" : 0,
  "numCompletedIndices" : 155,
  "executorRunTime" : 821532,
  "executorCpuTime" : 806253615510,
  "submissionTime" : "2022-03-01T12:36:32.018GMT",
  "firstTaskLaunchedTime" : "2022-03-01T12:36:32.028GMT",
  "completionTime" : "2022-03-01T12:37:33.643GMT",
  "failureReason" : "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1\n",
  "inputBytes" : 0,
  "inputRecords" : 0,
  "outputBytes" : 0,
  "outputRecords" : 0,
  "shuffleReadBytes" : 22927763615,
  "shuffleReadRecords" : 943893085,
  "shuffleWriteBytes" : 0,
  "shuffleWriteRecords" : 0,
  "memoryBytesSpilled" : 0,
  "diskBytesSpilled" : 0,
  "name" : "run at ThreadPoolExecutor.java:1149",
  "description" : "benchmark q24a-v2.4",
  "details" : "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)\njava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\njava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\njava.lang.Thread.run(Thread.java:748)",
  "schedulingPool" : "default",
  "rddIds" : [ 93, 92, 91, 90, 89 ],
  "accumulatorUpdates" : [ ],
  "tasks" : {
    "14343" : {
      "taskId" : 14343,
      "index" : 65,
      "attempt" : 0,
      "partitionId" : 157,
      "launchTime" : "2022-03-01T12:36:32.031GMT",
      "duration" : 5475,
      "executorId" : "5",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14367" : {
      "taskId" : 14367,
      "index" : 89,
      "attempt" : 0,
      "partitionId" : 182,
      "launchTime" : "2022-03-01T12:36:37.415GMT",
      "duration" : 5322,
      "executorId" : "8",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14404" : {
      "taskId" : 14404,
      "index" : 126,
      "attempt" : 0,
      "partitionId" : 219,
      "launchTime" : "2022-03-01T12:36:37.900GMT",
      "duration" : 5246,
      "executorId" : "7",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14463" : {
      "taskId" : 14463,
      "index" : 185,
      "attempt" : 0,
      "partitionId" : 278,
      "launchTime" : "2022-03-01T12:36:42.840GMT",
      "duration" : 103059,
      "executorId" : "6",
      "host" : "",
      "status" : "FAILED",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "errorMessage" : "FetchFailed",
      "taskMetrics" : {
      }
    },
    "14305" : {
      "taskId" : 14305,
      "index" : 27,
      "attempt" : 0,
      "partitionId" : 98,
      "launchTime" : "2022-03-01T12:36:32.030GMT",
      "duration" : 4910,
      "executorId" : "10",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14358" : {
      "taskId" : 14358,
      "index" : 80,
      "attempt" : 0,
      "partitionId" : 173,
      "launchTime" : "2022-03-01T12:36:37.257GMT",
      "duration" : 5062,
      "executorId" : "9",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14419" : {
      "taskId" : 14419,
      "index" : 141,
      "attempt" : 0,
      "partitionId" : 234,
      "launchTime" : "2022-03-01T12:36:41.673GMT",
      "duration" : 4798,
      "executorId" : "10",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14399" : {
      "taskId" : 14399,
      "index" : 121,
      "attempt" : 0,
      "partitionId" : 214,
      "launchTime" : "2022-03-01T12:36:37.797GMT",
      "duration" : 5337,
      "executorId" : "7",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14296" : {
      "taskId" : 14296,
      "index" : 18,
      "attempt" : 0,
      "partitionId" : 83,
      "launchTime" : "2022-03-01T12:36:32.029GMT",
      "duration" : 4944,
      "executorId" : "10",
      "host" : "",
      "status" : "SUCCESS",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "taskMetrics" : {
      }
    },
    "14468" : {
      "taskId" : 14468,
      "index" : 190,
      "attempt" : 0,
      "partitionId" : 283,
      "launchTime" : "2022-03-01T12:36:43.134GMT",
      "duration" : 103319,
      "executorId" : "7",
      "host" : "",
      "status" : "FAILED",
      "taskLocality" : "PROCESS_LOCAL",
      "speculative" : false,
      "accumulatorUpdates" : [ ],
      "errorMessage" : "FetchFailed",
      "taskMetrics" : {
      }
    }
....

@jackylee-ch
Copy link
Contributor Author

Would be good to explore how we can default the value to -1 if missing.
Else cleaning up all level-db would be a prerequisite to deploying this.

I am not sure what is the cleanest way to enforce incompatible leveldb format changes - IIRC you had made some >changes recently which require regen, right @dongjoon-hyun ? Any suggestions ? (IIRC we did not bump up the store >version)

ping @dongjoon-hyun. Could you take a look at the question above?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes look good to me.
Would like to understand how to handle the format change impact based on @dongjoon-hyun's experiences though.

+CC @Ngone51, @cloud-fan as well.

@jackylee-ch
Copy link
Contributor Author

Changes look good to me. Would like to understand how to handle the format change impact based on @dongjoon-hyun's experiences though.

@Ngone51 @mridulm Since the default value can be set, the previous problem does not exist now. Can this pr be merged?

@jackylee-ch
Copy link
Contributor Author

Any more question about this pr? Can this pr merge now?

@jackylee-ch
Copy link
Contributor Author

cc @cloud-fan @mridulm @Ngone51 @HyukjinKwon @MaxGekk GA has passed, can we merge this pr to master/3.3.0?

@cloud-fan
Copy link
Contributor

thanks, merging to master/3.3!

@cloud-fan cloud-fan closed this in 13cfc5b Mar 31, 2022
cloud-fan pushed a commit that referenced this pull request Mar 31, 2022
### Why are the changes needed?
There is no partition id in the current task metrics. It is difficult to track the task metrics of a specific partition or the stage metrics of processing data, especially when the stage was retried.
```
class TaskData private[spark](
    val taskId: Long,
    val index: Int,
    val attempt: Int,
    val launchTime: Date,
    val resultFetchStart: Option[Date],
    JsonDeserialize(contentAs = classOf[JLong])
    val duration: Option[Long],
    val executorId: String,
    val host: String,
    val status: String,
    val taskLocality: String,
    val speculative: Boolean,
    val accumulatorUpdates: Seq[AccumulableInfo],
    val errorMessage: Option[String] = None,
    val taskMetrics: Option[TaskMetrics] = None,
    val executorLogs: Map[String, String],
    val schedulerDelay: Long,
    val gettingResultTime: Long)
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
add new tests

Closes #35185 from jackylee-ch/SPARK-37831.

Lead-authored-by: stczwd <qcsd2011@163.com>
Co-authored-by: Jacky Lee <qcsd2011@gmail.com>
Co-authored-by: jackylee-ch <lijunqing@baidu.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 13cfc5b)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@jackylee-ch
Copy link
Contributor Author

Thanks all.

@jackylee-ch jackylee-ch deleted the SPARK-37831 branch March 31, 2022 15:00
HyukjinKwon pushed a commit that referenced this pull request Jul 5, 2022
…quals method in JsonProtocolSuite

### What changes were proposed in this pull request?

In #35185 , task partition id was added in taskInfo. And, JsonProtocolSuite#assertEquals about TaskInfo doesn't have partitionId.

### Why are the changes needed?

Should assert partitionId equals or not.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need to add unit test.

Closes #37081 from dcoliversun/SPARK-39676.

Authored-by: Qian.Sun <qian.sun2020@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
HyukjinKwon pushed a commit that referenced this pull request Jul 5, 2022
…quals method in JsonProtocolSuite

### What changes were proposed in this pull request?

In #35185 , task partition id was added in taskInfo. And, JsonProtocolSuite#assertEquals about TaskInfo doesn't have partitionId.

### Why are the changes needed?

Should assert partitionId equals or not.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No need to add unit test.

Closes #37081 from dcoliversun/SPARK-39676.

Authored-by: Qian.Sun <qian.sun2020@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants