-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-11740][Streaming]Fix the race condition of two checkpoints in a batch #9707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #45902 has finished for PR 9707 at commit
|
retest this please |
Test build #45912 has finished for PR 9707 at commit
|
/cc @tdas |
@@ -699,11 +698,11 @@ def check_output(n): | |||
# Verify that getOrCreate() uses existing SparkContext | |||
self.ssc.stop(True, True) | |||
time.sleep(1) | |||
sc = SparkContext(SparkConf()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this change needed for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure tearDown
can shutdown it if there is any exception.
LGTM. BTW, I would like to merge this to older branches like 1.5 and 1.4 as well. @harishreedharan Can you also take a look at this? |
// batch actually has the latest information, so we want to recovery from it. Therefore, we | ||
// also use the latest checkpoint time as the file name, so that we can recovery from the | ||
// latest checkpoint file. | ||
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea here is that if an older batch's completion-checkpoint comes in after a new batch's initial-checkpoint, we overwrite the initial checkpoint (since we would not reset the latestCheckpointTime
)?
This actually could essentially mean two checkpoints being written to the same files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This actually could essentially mean two checkpoints being written to the same files.
The previous file will be renamed to backupFile
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you get what I am saying. I am saying that two threads could run at the same time writing out data to the exact same files.
If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at the same time, writing to the same files.
-- Checkpoint 1: Completion of Batch Time t
-- Checkpoint 2: Start of Batch Time t+1
Checkpoint 2 starts -> latestCheckpoint = t + 1
Checkpoint 1 starts -> since latestCheckpoint != null
and latestCheckpoint > checkpointTime
, we would not reset latestCheckpoint
, so both checkpoints would use the same file name to write their checkpoints out.
Because of this, depending on which thread reaches the tempFile creation first, that would win - which is non-deterministic. The other thread would end up hitting an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is only one thread writing the checkpoint file. See val executor = Executors.newFixedThreadPool(1)
in CheckpointWriter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then we are fine. Can you put in a comment where the executor is being created, so we don't end up causing a bug due to this class not being thread-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
LGTM. Thanks @zsxwing ! |
Added a unit test using mock |
Test build #46113 has finished for PR 9707 at commit
|
Thanks @zsxwing and @harishreedharan. Merging this to master, 1.6 and 1.5 |
… a batch We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9707 from zsxwing/fix-checkpoint. (cherry picked from commit 928d631) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
… a batch We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file. Author: Shixiong Zhu <shixiong@databricks.com> Closes #9707 from zsxwing/fix-checkpoint. (cherry picked from commit 928d631) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file.