Skip to content

Commit 9e47b5b

Browse files
committed
Renamed class, simplified+added unit tests.
1 parent 6e1bfb8 commit 9e47b5b

File tree

5 files changed

+261
-257
lines changed

5 files changed

+261
-257
lines changed

streaming/src/main/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDD.scala

Lines changed: 0 additions & 92 deletions
This file was deleted.
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.streaming.rdd
18+
19+
import scala.reflect.ClassTag
20+
21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.spark._
23+
import org.apache.spark.rdd.BlockRDD
24+
import org.apache.spark.storage.{BlockId, StorageLevel}
25+
import org.apache.spark.streaming.util.{HdfsUtils, WriteAheadLogFileSegment, WriteAheadLogRandomReader}
26+
27+
/**
28+
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
29+
* It contains information about the id of the blocks having this partition's data and
30+
* the segment of the write ahead log that backs the partition.
31+
* @param index index of the partition
32+
* @param blockId id of the block having the partition data
33+
* @param segment segment of the write ahead log having the partition data
34+
*/
35+
private[streaming]
36+
class WriteAheadLogBackedBlockRDDPartition(
37+
val index: Int,
38+
val blockId: BlockId,
39+
val segment: WriteAheadLogFileSegment
40+
) extends Partition
41+
42+
43+
/**
44+
* This class represents a special case of the BlockRDD where the data blocks in the block manager are also
45+
* backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids
46+
* in the block manager. If it does not find them, it looks up the corresponding file segment.
47+
*
48+
* @param sc SparkContext
49+
* @param hadoopConfiguration Hadoop configuration
50+
* @param blockIds Ids of the blocks that contains this RDD's data
51+
* @param segments Segments in write ahead logs that contain this RDD's data
52+
* @param storeInBlockManager Whether to store in the block manager after reading from the log segment
53+
* @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true)
54+
*/
55+
private[streaming]
56+
class WriteAheadLogBackedBlockRDD[T: ClassTag](
57+
@transient sc: SparkContext,
58+
@transient hadoopConfiguration: Configuration,
59+
@transient override val blockIds: Array[BlockId],
60+
@transient val segments: Array[WriteAheadLogFileSegment],
61+
val storeInBlockManager: Boolean,
62+
val storageLevel: StorageLevel
63+
) extends BlockRDD[T](sc, blockIds) {
64+
65+
require(blockIds.length == segments.length,
66+
s"Number of block ids (${blockIds.length}) must be the same as number of segments (${segments.length}})!")
67+
68+
// Hadoop configuration is not serializable, so broadcast it as a serializable.
69+
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfiguration)
70+
71+
override def getPartitions: Array[Partition] = {
72+
assertValid()
73+
Array.tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) }
74+
}
75+
76+
/**
77+
* Gets the partition data by getting the corresponding block from the block manager. If the block does not
78+
* exist, then the data is read from the corresponding segment in write ahead log files.
79+
*/
80+
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
81+
assertValid()
82+
val hadoopConf = broadcastedHadoopConf.value
83+
val blockManager = SparkEnv.get.blockManager
84+
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
85+
val blockId = partition.blockId
86+
blockManager.get(blockId) match {
87+
case Some(block) => // Data is in Block Manager
88+
val iterator = block.data.asInstanceOf[Iterator[T]]
89+
logDebug(s"Read partition data of RDD $this from block manager, block $blockId")
90+
iterator
91+
case None => // Data not found in Block Manager, grab it from write ahead log file
92+
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
93+
val dataRead = reader.read(partition.segment)
94+
reader.close()
95+
logInfo(s"Read partition data of RDD $this from write ahead log, segment ${partition.segment}")
96+
if (storeInBlockManager) {
97+
blockManager.putBytes(blockId, dataRead, storageLevel)
98+
logDebug(s"Stored partition data of RDD $this into block manager with level $storageLevel")
99+
dataRead.rewind()
100+
}
101+
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
102+
}
103+
}
104+
105+
/**
106+
* Get the preferred location of the partition. This returns the locations of the block if it is present in the
107+
* block manager, else it returns the location of the corresponding segment in HDFS.
108+
*/
109+
override def getPreferredLocations(split: Partition): Seq[String] = {
110+
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
111+
val blockLocations = getBlockIdLocations().get(partition.blockId)
112+
lazy val segmentLocations = HdfsUtils.getBlockLocations(
113+
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration)
114+
blockLocations.orElse(segmentLocations).getOrElse(Seq.empty)
115+
}
116+
}

streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ private[streaming] object HdfsUtils {
5252
}
5353
}
5454

55-
def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration):
56-
Option[Array[String]] = {
55+
def getBlockLocations(path: String, offset: Long, length: Long, conf: Configuration): Option[Seq[String]] = {
5756
val dfsPath = new Path(path)
5857
val dfs = getFileSystemForPath(dfsPath, conf)
5958
val fileStatus = dfs.getFileStatus(dfsPath)

streaming/src/test/scala/org/apache/spark/streaming/rdd/HDFSBackedBlockRDDSuite.scala

Lines changed: 0 additions & 163 deletions
This file was deleted.

0 commit comments

Comments
 (0)