Skip to content

[SPARK-11740][Streaming]Fix the race condition of two checkpoints in a batch #9707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,6 @@ def tearDown(self):
if self.cpd is not None:
shutil.rmtree(self.cpd)

@unittest.skip("Enable it when we fix the checkpoint bug")
def test_get_or_create_and_get_active_or_create(self):
inputd = tempfile.mkdtemp()
outputd = tempfile.mkdtemp() + "/"
Expand Down Expand Up @@ -699,11 +698,11 @@ def check_output(n):
# Verify that getOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
sc = SparkContext(SparkConf())
self.sc = SparkContext(conf=SparkConf())
self.setupCalled = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed for this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure tearDown can shutdown it if there is any exception.

self.ssc = StreamingContext.getOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.assertTrue(self.ssc.sparkContext == sc)
self.assertTrue(self.ssc.sparkContext == self.sc)

# Verify the getActiveOrCreate() recovers from checkpoint files
self.ssc.stop(True, True)
Expand All @@ -722,11 +721,11 @@ def check_output(n):
# Verify that getActiveOrCreate() uses existing SparkContext
self.ssc.stop(True, True)
time.sleep(1)
self.sc = SparkContext(SparkConf())
self.sc = SparkContext(conf=SparkConf())
self.setupCalled = False
self.ssc = StreamingContext.getActiveOrCreate(self.cpd, setup)
self.assertFalse(self.setupCalled)
self.assertTrue(self.ssc.sparkContext == sc)
self.assertTrue(self.ssc.sparkContext == self.sc)

# Verify that getActiveOrCreate() calls setup() in absence of checkpoint files
self.ssc.stop(True, True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,30 @@ class CheckpointWriter(
private var stopped = false
private var fs_ : FileSystem = _

@volatile private var latestCheckpointTime: Time = null

class CheckpointWriteHandler(
checkpointTime: Time,
bytes: Array[Byte],
clearCheckpointDataLater: Boolean) extends Runnable {
def run() {
if (latestCheckpointTime == null || latestCheckpointTime < checkpointTime) {
latestCheckpointTime = checkpointTime
}
var attempts = 0
val startTime = System.currentTimeMillis()
val tempFile = new Path(checkpointDir, "temp")
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, checkpointTime)
val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, checkpointTime)
// We will do checkpoint when generating a batch and completing a batch. When the processing
// time of a batch is greater than the batch interval, checkpointing for completing an old
// batch may run after checkpointing of a new batch. If this happens, checkpoint of an old
// batch actually has the latest information, so we want to recovery from it. Therefore, we
// also use the latest checkpoint time as the file name, so that we can recovery from the
// latest checkpoint file.
//
// Note: there is only one thread writting the checkpoint files, so we don't need to worry
// about thread-safety.
val checkpointFile = Checkpoint.checkpointFile(checkpointDir, latestCheckpointTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that if an older batch's completion-checkpoint comes in after a new batch's initial-checkpoint, we overwrite the initial checkpoint (since we would not reset the latestCheckpointTime)?

This actually could essentially mean two checkpoints being written to the same files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually could essentially mean two checkpoints being written to the same files.

The previous file will be renamed to backupFile.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you get what I am saying. I am saying that two threads could run at the same time writing out data to the exact same files.

If I am not mistaken, there is a bug here that could lead to 2 checkpoints running at the same time, writing to the same files.
-- Checkpoint 1: Completion of Batch Time t
-- Checkpoint 2: Start of Batch Time t+1

Checkpoint 2 starts -> latestCheckpoint = t + 1
Checkpoint 1 starts -> since latestCheckpoint != null and latestCheckpoint > checkpointTime, we would not reset latestCheckpoint, so both checkpoints would use the same file name to write their checkpoints out.

Because of this, depending on which thread reaches the tempFile creation first, that would win - which is non-deterministic. The other thread would end up hitting an exception.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one thread writing the checkpoint file. See val executor = Executors.newFixedThreadPool(1) in CheckpointWriter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then we are fine. Can you put in a comment where the executor is being created, so we don't end up causing a bug due to this class not being thread-safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, latestCheckpointTime)

while (attempts < MAX_ATTEMPTS && !stopped) {
attempts += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.streaming

import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
import org.apache.spark.TestUtils

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
Expand All @@ -30,11 +29,13 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
import org.mockito.Mockito.mock
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.TestUtils
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}

/**
Expand Down Expand Up @@ -611,6 +612,28 @@ class CheckpointSuite extends TestSuiteBase {
assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
}

test("SPARK-11267: the race condition of two checkpoints in a batch") {
val jobGenerator = mock(classOf[JobGenerator])
val checkpointDir = Utils.createTempDir().toString
val checkpointWriter =
new CheckpointWriter(jobGenerator, conf, checkpointDir, new Configuration())
val bytes1 = Array.fill[Byte](10)(1)
new checkpointWriter.CheckpointWriteHandler(
Time(2000), bytes1, clearCheckpointDataLater = false).run()
val bytes2 = Array.fill[Byte](10)(2)
new checkpointWriter.CheckpointWriteHandler(
Time(1000), bytes2, clearCheckpointDataLater = true).run()
val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir).reverse.map { path =>
new File(path.toUri)
}
assert(checkpointFiles.size === 2)
// Although bytes2 was written with an old time, it contains the latest status, so we should
// try to read from it at first.
assert(Files.toByteArray(checkpointFiles(0)) === bytes2)
assert(Files.toByteArray(checkpointFiles(1)) === bytes1)
checkpointWriter.stop()
}

/**
* Tests a streaming operation under checkpointing, by restarting the operation
* from checkpoint file and verifying whether the final output is correct.
Expand Down