@@ -48,8 +48,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
48
48
case None =>
49
49
// Acquire a lock for loading this partition
50
50
// If another thread already holds the lock, wait for it to finish return its results
51
- acquireLockForPartition(key).foreach { values =>
52
- return new InterruptibleIterator [T ](context, values.asInstanceOf [Iterator [T ]])
51
+ val storedValues = acquireLockForPartition[T ](key)
52
+ if (storedValues.isDefined) {
53
+ return new InterruptibleIterator [T ](context, storedValues.get)
53
54
}
54
55
55
56
// Otherwise, we have to load the partition ourselves
@@ -64,7 +65,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
64
65
65
66
// Otherwise, cache the values and keep track of any updates in block statuses
66
67
val updatedBlocks = new ArrayBuffer [(BlockId , BlockStatus )]
67
- val cachedValues = cacheValues (key, computedValues, storageLevel, updatedBlocks)
68
+ val cachedValues = putInBlockManager (key, computedValues, storageLevel, updatedBlocks)
68
69
context.taskMetrics.updatedBlocks = Some (updatedBlocks)
69
70
new InterruptibleIterator (context, cachedValues)
70
71
@@ -83,10 +84,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
83
84
* If the lock is free, just acquire it and return None. Otherwise, another thread is already
84
85
* loading the partition, so we wait for it to finish and return the values loaded by the thread.
85
86
*/
86
- private def acquireLockForPartition (id : RDDBlockId ): Option [Iterator [Any ]] = {
87
+ private def acquireLockForPartition [ T ] (id : RDDBlockId ): Option [Iterator [T ]] = {
87
88
loading.synchronized {
88
89
if (! loading.contains(id)) {
89
- // If the partition is free, acquire its lock and begin computing its value
90
+ // If the partition is free, acquire its lock to compute its value
90
91
loading.add(id)
91
92
None
92
93
} else {
@@ -101,17 +102,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
101
102
}
102
103
}
103
104
logInfo(s " Finished waiting for $id" )
104
- /* See whether someone else has successfully loaded it. The main way this would fail
105
- * is for the RDD-level cache eviction policy if someone else has loaded the same RDD
106
- * partition but we didn't want to make space for it. However, that case is unlikely
107
- * because it's unlikely that two threads would work on the same RDD partition. One
108
- * downside of the current code is that threads wait serially if this does happen. */
109
105
val values = blockManager.get(id)
110
106
if (! values.isDefined) {
107
+ /* The block is not guaranteed to exist even after the other thread has finished.
108
+ * For instance, the block could be evicted after it was put, but before our get.
109
+ * In this case, we still need to load the partition ourselves. */
111
110
logInfo(s " Whoever was loading $id failed; we'll try it ourselves " )
112
111
loading.add(id)
113
112
}
114
- values
113
+ values.map(_. asInstanceOf [ Iterator [ T ]])
115
114
}
116
115
}
117
116
}
@@ -120,45 +119,46 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
120
119
* Cache the values of a partition, keeping track of any updates in the storage statuses
121
120
* of other blocks along the way.
122
121
*/
123
- private def cacheValues [T ](
122
+ private def putInBlockManager [T ](
124
123
key : BlockId ,
125
- value : Iterator [T ],
124
+ values : Iterator [T ],
126
125
storageLevel : StorageLevel ,
127
126
updatedBlocks : ArrayBuffer [(BlockId , BlockStatus )]): Iterator [T ] = {
128
127
129
- if (! storageLevel.useMemory) {
130
- /* This RDD is not to be cached in memory, so we can just pass the computed values
131
- * as an iterator directly to the BlockManager, rather than first fully unrolling
132
- * it in memory. The latter option potentially uses much more memory and risks OOM
133
- * exceptions that can be avoided. */
134
- assume(storageLevel.useDisk || storageLevel.useOffHeap, s " Empty storage level for $key! " )
135
- updatedBlocks ++= blockManager.put(key, value, storageLevel, tellMaster = true )
136
- blockManager.get(key) match {
137
- case Some (values) =>
138
- values.asInstanceOf [Iterator [T ]]
139
- case None =>
140
- logInfo(s " Failure to store $key" )
141
- throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
142
- }
143
- } else {
144
- /* This RDD is to be cached in memory. In this case we cannot pass the computed values
145
- * to the BlockManager as an iterator and expect to read it back later. This is because
146
- * we may end up dropping a partition from memory store before getting it back, e.g.
147
- * when the entirety of the RDD does not fit in memory. */
148
- if (storageLevel.deserialized) {
149
- val elements = new ArrayBuffer [Any ]
150
- elements ++= value
151
- updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
152
- elements.iterator.asInstanceOf [Iterator [T ]]
128
+ val cachedValues = {
129
+ if (! storageLevel.useMemory) {
130
+ /* This RDD is not to be cached in memory, so we can just pass the computed values
131
+ * as an iterator directly to the BlockManager, rather than first fully unrolling
132
+ * it in memory. The latter option potentially uses much more memory and risks OOM
133
+ * exceptions that can be avoided. */
134
+ updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true )
135
+ blockManager.get(key) match {
136
+ case Some (v) => v
137
+ case None =>
138
+ logInfo(s " Failure to store $key" )
139
+ throw new BlockException (key, s " Block manager failed to return cached value for $key! " )
140
+ }
153
141
} else {
154
- /* This RDD is to be cached in memory in the form of serialized bytes. In this case,
155
- * we only unroll the serialized form of the data, because the deserialized form may
156
- * be much larger and may not fit in memory. */
157
- val bytes = blockManager.dataSerialize(key, value)
158
- updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true )
159
- blockManager.dataDeserialize(key, bytes).asInstanceOf [Iterator [T ]]
142
+ /* This RDD is to be cached in memory. In this case we cannot pass the computed values
143
+ * to the BlockManager as an iterator and expect to read it back later. This is because
144
+ * we may end up dropping a partition from memory store before getting it back, e.g.
145
+ * when the entirety of the RDD does not fit in memory. */
146
+ if (storageLevel.deserialized) {
147
+ val elements = new ArrayBuffer [Any ]
148
+ elements ++= values
149
+ updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true )
150
+ elements.iterator
151
+ } else {
152
+ /* This RDD is to be cached in memory in the form of serialized bytes. In this case,
153
+ * we only unroll the serialized form of the data, because the deserialized form may
154
+ * be much larger and may not fit in memory. */
155
+ val bytes = blockManager.dataSerialize(key, values)
156
+ updatedBlocks ++= blockManager.putBytes(key, bytes, storageLevel, tellMaster = true )
157
+ blockManager.dataDeserialize(key, bytes)
158
+ }
160
159
}
161
160
}
161
+ cachedValues.asInstanceOf [Iterator [T ]]
162
162
}
163
163
164
164
}
0 commit comments