Skip to content

[SPARK-11740][Streaming]Fix the race condition of two checkpoints in a batch #9707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

[SPARK-11740][Streaming]Fix the race condition of two checkpoints in a batch #9707

wants to merge 3 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Nov 13, 2015

We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file.

@SparkQA
Copy link

SparkQA commented Nov 13, 2015

Test build #45902 has finished for PR 9707 at commit 494696e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 14, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Nov 14, 2015

Test build #45912 has finished for PR 9707 at commit 494696e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Nov 14, 2015

/cc @tdas

@@ -699,11 +698,11 @@ def check_output(n):
# Verify that getOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
sc = SparkContext(SparkConf())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed for this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure tearDown can shutdown it if there is any exception.

@tdas
Copy link
Contributor

tdas commented Nov 17, 2015

LGTM.
I wonder whether we can add a unit test to test this behavior. We could create a CheckpointWriter with a mock JobGenerator, and then test whether the latest checkpoint time is respected.

BTW, I would like to merge this to older branches like 1.5 and 1.4 as well.

@harishreedharan Can you also take a look at this?

// batch actually has the latest information, so we want to recovery from it. Therefore, we
// also use the latest checkpoint time as the file name, so that we can recovery from the
// latest checkpoint file.
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that if an older batch's completion-checkpoint comes in after a new batch's initial-checkpoint, we overwrite the initial checkpoint (since we would not reset the latestCheckpointTime)?

This actually could essentially mean two checkpoints being written to the same files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually could essentially mean two checkpoints being written to the same files.

The previous file will be renamed to backupFile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you get what I am saying. I am saying that two threads could run at the same time writing out data to the exact same files.

If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at the same time, writing to the same files.
-- Checkpoint 1: Completion of Batch Time t
-- Checkpoint 2: Start of Batch Time t+1

Checkpoint 2 starts -> latestCheckpoint = t + 1
Checkpoint 1 starts -> since latestCheckpoint != null and latestCheckpoint > checkpointTime, we would not reset latestCheckpoint, so both checkpoints would use the same file name to write their checkpoints out.

Because of this, depending on which thread reaches the tempFile creation first, that would win - which is non-deterministic. The other thread would end up hitting an exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one thread writing the checkpoint file. See val executor = Executors.newFixedThreadPool(1) in CheckpointWriter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then we are fine. Can you put in a comment where the executor is being created, so we don't end up causing a bug due to this class not being thread-safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@harishreedharan
Copy link
Contributor

LGTM. Thanks @zsxwing !

@zsxwing
Copy link
Member Author

zsxwing commented Nov 17, 2015

Added a unit test using mock

@SparkQA
Copy link

SparkQA commented Nov 17, 2015

Test build #46113 has finished for PR 9707 at commit fa580b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Nov 17, 2015

Thanks @zsxwing and @harishreedharan. Merging this to master, 1.6 and 1.5

asfgit pushed a commit that referenced this pull request Nov 17, 2015
… a batch

We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9707 from zsxwing/fix-checkpoint.

(cherry picked from commit 928d631)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 928d631 Nov 17, 2015
asfgit pushed a commit that referenced this pull request Nov 17, 2015
… a batch

We will do checkpoint when generating a batch and completing a batch. When the processing time of a batch is greater than the batch interval, checkpointing for completing an old batch may run after checkpointing for generating a new batch. If this happens, checkpoint of an old batch actually has the latest information, so we want to recovery from it. This PR will use the latest checkpoint time as the file name, so that we can always recovery from the latest checkpoint file.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9707 from zsxwing/fix-checkpoint.

(cherry picked from commit 928d631)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@zsxwing zsxwing deleted the fix-checkpoint branch November 17, 2015 23:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants