@@ -41,41 +41,47 @@ class WriteAheadLogBackedBlockRDDPartition(
41
41
42
42
43
43
/**
44
- * This class represents a special case of the BlockRDD where the data blocks in the block manager are also
45
- * backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids
46
- * in the block manager. If it does not find them, it looks up the corresponding file segment.
44
+ * This class represents a special case of the BlockRDD where the data blocks in
45
+ * the block manager are also backed by segments in write ahead logs. For reading
46
+ * the data, this RDD first looks up the blocks by their ids in the block manager.
47
+ * If it does not find them, it looks up the corresponding file segment.
47
48
*
48
49
* @param sc SparkContext
49
- * @param hadoopConfiguration Hadoop configuration
50
+ * @param hadoopConfig Hadoop configuration
50
51
* @param blockIds Ids of the blocks that contains this RDD's data
51
52
* @param segments Segments in write ahead logs that contain this RDD's data
52
- * @param storeInBlockManager Whether to store in the block manager after reading from the log segment
53
- * @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true)
53
+ * @param storeInBlockManager Whether to store in the block manager after reading from the segment
54
+ * @param storageLevel storage level to store when storing in block manager
55
+ * (applicable when storeInBlockManager = true)
54
56
*/
55
57
private [streaming]
56
58
class WriteAheadLogBackedBlockRDD [T : ClassTag ](
57
59
@ transient sc : SparkContext ,
58
- @ transient hadoopConfiguration : Configuration ,
60
+ @ transient hadoopConfig : Configuration ,
59
61
@ transient override val blockIds : Array [BlockId ],
60
62
@ transient val segments : Array [WriteAheadLogFileSegment ],
61
63
val storeInBlockManager : Boolean ,
62
64
val storageLevel : StorageLevel
63
65
) extends BlockRDD [T ](sc, blockIds) {
64
66
65
- require(blockIds.length == segments.length,
66
- s " Number of block ids ( ${blockIds.length}) must be the same as number of segments ( ${segments.length}})! " )
67
+ require(
68
+ blockIds.length == segments.length,
69
+ s " Number of block ids ( ${blockIds.length}) must be " +
70
+ s " the same as number of segments ( ${segments.length}})! " )
67
71
68
72
// Hadoop configuration is not serializable, so broadcast it as a serializable.
69
- private val broadcastedHadoopConf = new SerializableWritable (hadoopConfiguration )
73
+ private val broadcastedHadoopConf = new SerializableWritable (hadoopConfig )
70
74
71
75
override def getPartitions : Array [Partition ] = {
72
76
assertValid()
73
- Array .tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), segments(i)) }
77
+ Array .tabulate(blockIds.size) { i =>
78
+ new WriteAheadLogBackedBlockRDDPartition (i, blockIds(i), segments(i)) }
74
79
}
75
80
76
81
/**
77
- * Gets the partition data by getting the corresponding block from the block manager. If the block does not
78
- * exist, then the data is read from the corresponding segment in write ahead log files.
82
+ * Gets the partition data by getting the corresponding block from the block manager.
83
+ * If the block does not exist, then the data is read from the corresponding segment
84
+ * in write ahead log files.
79
85
*/
80
86
override def compute (split : Partition , context : TaskContext ): Iterator [T ] = {
81
87
assertValid()
@@ -86,31 +92,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
86
92
blockManager.get(blockId) match {
87
93
case Some (block) => // Data is in Block Manager
88
94
val iterator = block.data.asInstanceOf [Iterator [T ]]
89
- logDebug(s " Read partition data of RDD $this from block manager, block $blockId" )
95
+ logDebug(s " Read partition data of $this from block manager, block $blockId" )
90
96
iterator
91
97
case None => // Data not found in Block Manager, grab it from write ahead log file
92
98
val reader = new WriteAheadLogRandomReader (partition.segment.path, hadoopConf)
93
99
val dataRead = reader.read(partition.segment)
94
100
reader.close()
95
- logInfo(s " Read partition data of RDD $this from write ahead log, segment ${partition.segment}" )
101
+ logInfo(s " Read partition data of $this from write ahead log, segment ${partition.segment}" )
96
102
if (storeInBlockManager) {
97
103
blockManager.putBytes(blockId, dataRead, storageLevel)
98
- logDebug(s " Stored partition data of RDD $this into block manager with level $storageLevel" )
104
+ logDebug(s " Stored partition data of $this into block manager with level $storageLevel" )
99
105
dataRead.rewind()
100
106
}
101
107
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf [Iterator [T ]]
102
108
}
103
109
}
104
110
105
111
/**
106
- * Get the preferred location of the partition. This returns the locations of the block if it is present in the
107
- * block manager, else it returns the location of the corresponding segment in HDFS.
112
+ * Get the preferred location of the partition. This returns the locations of the block
113
+ * if it is present in the block manager, else it returns the location of the
114
+ * corresponding segment in HDFS.
108
115
*/
109
116
override def getPreferredLocations (split : Partition ): Seq [String ] = {
110
117
val partition = split.asInstanceOf [WriteAheadLogBackedBlockRDDPartition ]
111
118
val blockLocations = getBlockIdLocations().get(partition.blockId)
112
119
lazy val segmentLocations = HdfsUtils .getBlockLocations(
113
- partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration )
120
+ partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig )
114
121
blockLocations.orElse(segmentLocations).getOrElse(Seq .empty)
115
122
}
116
123
}
0 commit comments