Skip to content

Commit fb1fbca

Browse files
committed
[SPARK-4027][Streaming] WriteAheadLogBackedBlockRDD to read received either from BlockManager or WAL in HDFS
As part of the initiative of preventing data loss on streaming driver failure, this sub-task implements a BlockRDD that is backed by HDFS. This BlockRDD can either read data from the Spark's BlockManager, or read the data from file-segments in write ahead log in HDFS. Most of this code has been written by @harishreedharan Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Hari Shreedharan <hshreedharan@apache.org> Closes apache#2931 from tdas/driver-ha-rdd and squashes the following commits: 209e49c [Tathagata Das] Better fix to style issue. 4a5866f [Tathagata Das] Addressed one more comment. ed5fbf0 [Tathagata Das] Minor updates. b0a18b1 [Tathagata Das] Fixed import order. 20aa7c6 [Tathagata Das] Fixed more line length issues. 29aa099 [Tathagata Das] Fixed line length issues. 9e47b5b [Tathagata Das] Renamed class, simplified+added unit tests. 6e1bfb8 [Tathagata Das] Tweaks testuite to create spark contxt lazily to prevent contxt leaks. 9c86a61 [Tathagata Das] Merge pull request alteryx#22 from harishreedharan/driver-ha-rdd 2878c38 [Hari Shreedharan] Shutdown spark context after tests. Formatting/minor fixes c709f2f [Tathagata Das] Merge pull request alteryx#21 from harishreedharan/driver-ha-rdd 5cce16f [Hari Shreedharan] Make sure getBlockLocations uses offset and length to find the blocks on HDFS eadde56 [Tathagata Das] Transferred HDFSBackedBlockRDD for the driver-ha-working branch
1 parent 234de92 commit fb1fbca

File tree

4 files changed

+285
-3
lines changed

4 files changed

+285
-3
lines changed

core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,9 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
8484
"Attempted to use %s after its blocks have been removed!".format(toString))
8585
}
8686
}
87+
88+
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
89+
locations_
90+
}
8791
}
8892

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.rdd
18+
19+
import scala.reflect.ClassTag
20+
21+
import org.apache.hadoop.conf.Configuration
22+
23+
import org.apache.spark._
24+
import org.apache.spark.rdd.BlockRDD
25+
import org.apache.spark.storage.{BlockId, StorageLevel}
26+
import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
27+
28+
/**
29+
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
30+
* It contains information about the id of the blocks having this partition's data and
31+
* the segment of the write ahead log that backs the partition.
32+
* @param index index of the partition
33+
* @param blockId id of the block having the partition data
34+
* @param segment segment of the write ahead log having the partition data
35+
*/
36+
private[streaming]
37+
class WriteAheadLogBackedBlockRDDPartition(
38+
val index: Int,
39+
val blockId: BlockId,
40+
val segment: WriteAheadLogFileSegment)
41+
extends Partition
42+
43+
44+
/**
45+
* This class represents a special case of the BlockRDD where the data blocks in
46+
* the block manager are also backed by segments in write ahead logs. For reading
47+
* the data, this RDD first looks up the blocks by their ids in the block manager.
48+
* If it does not find them, it looks up the corresponding file segment.
49+
*
50+
* @param sc SparkContext
51+
* @param hadoopConfig Hadoop configuration
52+
* @param blockIds Ids of the blocks that contains this RDD's data
53+
* @param segments Segments in write ahead logs that contain this RDD's data
54+
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
55+
* @param storageLevel storage level to store when storing in block manager
56+
* (applicable when storeInBlockManager = true)
57+
*/
58+
private[streaming]
59+
class WriteAheadLogBackedBlockRDD[T: ClassTag](
60+
@transient sc: SparkContext,
61+
@transient hadoopConfig: Configuration,
62+
@transient blockIds: Array[BlockId],
63+
@transient segments: Array[WriteAheadLogFileSegment],
64+
storeInBlockManager: Boolean,
65+
storageLevel: StorageLevel)
66+
extends BlockRDD[T](sc, blockIds) {
67+
68+
require(
69+
blockIds.length == segments.length,
70+
s"Number of block ids (${blockIds.length}) must be " +
71+
s"the same as number of segments (${segments.length}})!")
72+
73+
// Hadoop configuration is not serializable, so broadcast it as a serializable.
74+
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
75+
76+
override def getPartitions: Array[Partition] = {
77+
assertValid()
78+
Array.tabulate(blockIds.size) { i =>
79+
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
80+
}
81+
}
82+
83+
/**
84+
* Gets the partition data by getting the corresponding block from the block manager.
85+
* If the block does not exist, then the data is read from the corresponding segment
86+
* in write ahead log files.
87+
*/
88+
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
89+
assertValid()
90+
val hadoopConf = broadcastedHadoopConf.value
91+
val blockManager = SparkEnv.get.blockManager
92+
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
93+
val blockId = partition.blockId
94+
blockManager.get(blockId) match {
95+
case Some(block) => // Data is in Block Manager
96+
val iterator = block.data.asInstanceOf[Iterator[T]]
97+
logDebug(s"Read partition data of $this from block manager, block $blockId")
98+
iterator
99+
case None => // Data not found in Block Manager, grab it from write ahead log file
100+
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
101+
val dataRead = reader.read(partition.segment)
102+
reader.close()
103+
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
104+
if (storeInBlockManager) {
105+
blockManager.putBytes(blockId, dataRead, storageLevel)
106+
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
107+
dataRead.rewind()
108+
}
109+
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
110+
}
111+
}
112+
113+
/**
114+
* Get the preferred location of the partition. This returns the locations of the block
115+
* if it is present in the block manager, else it returns the location of the
116+
* corresponding segment in HDFS.
117+
*/
118+
override def getPreferredLocations(split: Partition): Seq[String] = {
119+
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
120+
val blockLocations = getBlockIdLocations().get(partition.blockId)
121+
def segmentLocations = HdfsUtils.getFileSegmentLocations(
122+
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
123+
blockLocations.getOrElse(segmentLocations)
124+
}
125+
}

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ private[streaming] object HdfsUtils {
5252
}
5353
}
5454

55-
def getBlockLocations(path: String, conf: Configuration): Option[Array[String]] = {
55+
/** Get the locations of the HDFS blocks containing the given file segment. */
56+
def getFileSegmentLocations(
57+
path: String, offset: Long, length: Long, conf: Configuration): Array[String] = {
5658
val dfsPath = new Path(path)
5759
val dfs = getFileSystemForPath(dfsPath, conf)
5860
val fileStatus = dfs.getFileStatus(dfsPath)
59-
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen))
60-
blockLocs.map(_.flatMap(_.getHosts))
61+
val blockLocs = Option(dfs.getFileBlockLocations(fileStatus, offset, length))
62+
blockLocs.map(_.flatMap(_.getHosts)).getOrElse(Array.empty)
6163
}
6264

6365
def getFileSystemForPath(path: Path, conf: Configuration): FileSystem = {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.rdd
18+
19+
import java.io.File
20+
21+
import scala.util.Random
22+
23+
import com.google.common.io.Files
24+
import org.apache.hadoop.conf.Configuration
25+
import org.scalatest.{BeforeAndAfterAll, FunSuite}
26+
27+
import org.apache.spark.{SparkConf, SparkContext}
28+
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
29+
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogWriter}
30+
31+
class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
32+
val conf = new SparkConf()
33+
.setMaster("local[2]")
34+
.setAppName(this.getClass.getSimpleName)
35+
val hadoopConf = new Configuration()
36+
37+
var sparkContext: SparkContext = null
38+
var blockManager: BlockManager = null
39+
var dir: File = null
40+
41+
override def beforeAll(): Unit = {
42+
sparkContext = new SparkContext(conf)
43+
blockManager = sparkContext.env.blockManager
44+
dir = Files.createTempDir()
45+
}
46+
47+
override def afterAll(): Unit = {
48+
// Copied from LocalSparkContext, simpler than to introduced test dependencies to core tests.
49+
sparkContext.stop()
50+
dir.delete()
51+
System.clearProperty("spark.driver.port")
52+
}
53+
54+
test("Read data available in block manager and write ahead log") {
55+
testRDD(5, 5)
56+
}
57+
58+
test("Read data available only in block manager, not in write ahead log") {
59+
testRDD(5, 0)
60+
}
61+
62+
test("Read data available only in write ahead log, not in block manager") {
63+
testRDD(0, 5)
64+
}
65+
66+
test("Read data available only in write ahead log, and test storing in block manager") {
67+
testRDD(0, 5, testStoreInBM = true)
68+
}
69+
70+
test("Read data with partially available in block manager, and rest in write ahead log") {
71+
testRDD(3, 2)
72+
}
73+
74+
/**
75+
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
76+
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
77+
* It can also test if the partitions that were read from the log were again stored in
78+
* block manager.
79+
* @param numPartitionsInBM Number of partitions to write to the Block Manager
80+
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
81+
* @param testStoreInBM Test whether blocks read from log are stored back into block manager
82+
*/
83+
private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
84+
val numBlocks = numPartitionsInBM + numPartitionsInWAL
85+
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
86+
87+
// Put the necessary blocks in the block manager
88+
val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
89+
data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
90+
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
91+
}
92+
93+
// Generate write ahead log segments
94+
val segments = generateFakeSegments(numPartitionsInBM) ++
95+
writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))
96+
97+
// Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
98+
require(
99+
blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
100+
"Expected blocks not in BlockManager"
101+
)
102+
require(
103+
blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty),
104+
"Unexpected blocks in BlockManager"
105+
)
106+
107+
// Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not
108+
require(
109+
segments.takeRight(numPartitionsInWAL).forall(s =>
110+
new File(s.path.stripPrefix("file://")).exists()),
111+
"Expected blocks not in write ahead log"
112+
)
113+
require(
114+
segments.take(numPartitionsInBM).forall(s =>
115+
!new File(s.path.stripPrefix("file://")).exists()),
116+
"Unexpected blocks in write ahead log"
117+
)
118+
119+
// Create the RDD and verify whether the returned data is correct
120+
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
121+
segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
122+
assert(rdd.collect() === data.flatten)
123+
124+
if (testStoreInBM) {
125+
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, hadoopConf, blockIds.toArray,
126+
segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
127+
assert(rdd2.collect() === data.flatten)
128+
assert(
129+
blockIds.forall(blockManager.get(_).nonEmpty),
130+
"All blocks not found in block manager"
131+
)
132+
}
133+
}
134+
135+
private def writeLogSegments(
136+
blockData: Seq[Seq[String]],
137+
blockIds: Seq[BlockId]
138+
): Seq[WriteAheadLogFileSegment] = {
139+
require(blockData.size === blockIds.size)
140+
val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf)
141+
val segments = blockData.zip(blockIds).map { case (data, id) =>
142+
writer.write(blockManager.dataSerialize(id, data.iterator))
143+
}
144+
writer.close()
145+
segments
146+
}
147+
148+
private def generateFakeSegments(count: Int): Seq[WriteAheadLogFileSegment] = {
149+
Array.fill(count)(new WriteAheadLogFileSegment("random", 0l, 0))
150+
}
151+
}

0 commit comments

Comments
 (0)