Skip to content

[SPARK-24552][SQL] Use task ID instead of attempt number for v2 writes. #21558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.execution.streaming.MicroBatchExecution
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -110,7 +108,7 @@ object DataWritingSparkTask extends Logging {
useCommitCoordinator: Boolean): WriterCommitMessage = {
val stageId = context.stageId()
val partId = context.partitionId()
val attemptId = context.attemptNumber()
val attemptId = context.taskAttemptId().toInt
Copy link
Contributor

@squito squito Jun 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you rename this variable to tid? these names are pretty confusing, but I think that at least "tid" is used consistently and exclusively for this, while "attempt" means a lot of different things.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to suggest removing the cast to int, but well, that's in DataWriterFactory and would be an API breakage... hopefully won't cause issues aside from weird output names when the value overflows the int.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopWriteConfigUtil has the same issue, its a public interface and uses in for attempt number.
it seems somewhat unlikely but more likely to be able to go over an int for task ids in spark then in say MapReduce. we do have partitionId as an Int so if partitions go to Int and you have task failures then taskids could go over Int. Looking at our options

val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)

Expand All @@ -124,10 +122,12 @@ object DataWritingSparkTask extends Logging {
val coordinator = SparkEnv.get.outputCommitCoordinator
val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
if (commitAuthorized) {
logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
logInfo(
s"Writer for stage $stageId, task $partId (TID $attemptId) is authorized to commit.")
dataWriter.commit()
} else {
val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit"
val message =
s"Stage $stageId, task $partId (TID $attemptId): driver did not authorize commit"
logInfo(message)
// throwing CommitDeniedException will trigger the catch block for abort
throw new CommitDeniedException(message, stageId, partId, attemptId)
Expand All @@ -138,15 +138,15 @@ object DataWritingSparkTask extends Logging {
dataWriter.commit()
}

logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.")
logInfo(s"Writer for stage $stageId, task $partId (TID $attemptId) committed.")

msg

})(catchBlock = {
// If there is an error, abort this writer
logError(s"Writer for stage $stageId, task $partId.$attemptId is aborting.")
logError(s"Writer for stage $stageId, task $partId (TID $attemptId) is aborting.")
dataWriter.abort()
logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.")
logError(s"Writer for stage $stageId, task $partId (TID $attemptId) aborted.")
})
}
}
Expand Down