@@ -180,6 +180,7 @@ private[sql] case class InMemoryColumnarTableScan(
180
180
}
181
181
}
182
182
183
+ // Accumulators used for testing purposes
183
184
val readPartitions = sparkContext.accumulator(0 )
184
185
val readBatches = sparkContext.accumulator(0 )
185
186
@@ -211,27 +212,14 @@ private[sql] case class InMemoryColumnarTableScan(
211
212
}
212
213
213
214
val nextRow = new SpecificMutableRow (requestedColumnDataTypes)
214
- val rows = cachedBatchIterator
215
- // Skip pruned batches
216
- .filter { cachedBatch =>
217
- if (inMemoryPartitionPruningEnabled && ! partitionFilter(cachedBatch.stats)) {
218
- def statsString = relation.partitionStatistics.schema
219
- .zip(cachedBatch.stats)
220
- .map { case (a, s) => s " ${a.name}: $s" }
221
- .mkString(" , " )
222
- logInfo(s " Skipping partition based on stats $statsString" )
223
- false
224
- } else {
225
- readBatches += 1
226
- true
227
- }
228
- }
229
- // Build column accessors
230
- .map { cachedBatch =>
231
- requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor (_))
232
- }
233
- // Extract rows via column accessors
234
- .flatMap { columnAccessors =>
215
+
216
+ def cachedBatchesToRows (cacheBatches : Iterator [CachedBatch ]) = {
217
+ val rows = cacheBatches.flatMap { cachedBatch =>
218
+ // Build column accessors
219
+ val columnAccessors =
220
+ requestedColumnIndices.map(cachedBatch.buffers(_)).map(ColumnAccessor (_))
221
+
222
+ // Extract rows via column accessors
235
223
new Iterator [Row ] {
236
224
override def next () = {
237
225
var i = 0
@@ -246,11 +234,34 @@ private[sql] case class InMemoryColumnarTableScan(
246
234
}
247
235
}
248
236
249
- if (rows.hasNext) {
250
- readPartitions += 1
237
+ if (rows.hasNext) {
238
+ readPartitions += 1
239
+ }
240
+
241
+ rows
251
242
}
252
243
253
- rows
244
+ // Do partition batch pruning if enabled
245
+ val cachedBatchesToScan =
246
+ if (inMemoryPartitionPruningEnabled) {
247
+ cachedBatchIterator.filter { cachedBatch =>
248
+ if (! partitionFilter(cachedBatch.stats)) {
249
+ def statsString = relation.partitionStatistics.schema
250
+ .zip(cachedBatch.stats)
251
+ .map { case (a, s) => s " ${a.name}: $s" }
252
+ .mkString(" , " )
253
+ logInfo(s " Skipping partition based on stats $statsString" )
254
+ false
255
+ } else {
256
+ readBatches += 1
257
+ true
258
+ }
259
+ }
260
+ } else {
261
+ cachedBatchIterator
262
+ }
263
+
264
+ cachedBatchesToRows(cachedBatchesToScan)
254
265
}
255
266
}
256
267
}
0 commit comments