-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11740][Streaming]Fix the race condition of two checkpoints in a batch #9707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -187,16 +187,30 @@ class CheckpointWriter( | |
private var stopped = false | ||
private var fs_ : FileSystem = _ | ||
|
||
@volatile private var latestCheckpointTime: Time = null | ||
|
||
class CheckpointWriteHandler( | ||
checkpointTime: Time, | ||
bytes: Array[Byte], | ||
clearCheckpointDataLater: Boolean) extends Runnable { | ||
def run() { | ||
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) { | ||
latestCheckpointTime = checkpointTime | ||
} | ||
var attempts = 0 | ||
val startTime = System.currentTimeMillis() | ||
val tempFile = new Path(checkpointDir, "temp") | ||
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime) | ||
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime) | ||
// We will do checkpoint when generating a batch and completing a batch. When the processing | ||
// time of a batch is greater than the batch interval, checkpointing for completing an old | ||
// batch may run after checkpointing of a new batch. If this happens, checkpoint of an old | ||
// batch actually has the latest information, so we want to recovery from it. Therefore, we | ||
// also use the latest checkpoint time as the file name, so that we can recovery from the | ||
// latest checkpoint file. | ||
// | ||
// Note: there is only one thread writting the checkpoint files, so we don't need to worry | ||
// about thread-safety. | ||
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the idea here is that if an older batch's completion-checkpoint comes in after a new batch's initial-checkpoint, we overwrite the initial checkpoint (since we would not reset the This actually could essentially mean two checkpoints being written to the same files. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The previous file will be renamed to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think you get what I am saying. I am saying that two threads could run at the same time writing out data to the exact same files. If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at the same time, writing to the same files. Checkpoint 2 starts -> Because of this, depending on which thread reaches the tempFile creation first, that would win - which is non-deterministic. The other thread would end up hitting an exception. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is only one thread writing the checkpoint file. See There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, then we are fine. Can you put in a comment where the executor is being created, so we don't end up causing a bug due to this class not being thread-safe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated |
||
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime) | ||
|
||
while (attempts < MAX_ATTEMPTS && !stopped) { | ||
attempts += 1 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change needed for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure
tearDown
can shutdown it if there is any exception.