15
15
* limitations under the License.
16
16
*/
17
17
18
- package org .apache .spark .rdd
18
+ package org .apache .spark .mllib . rdd
19
19
20
20
import scala .collection .mutable
21
21
import scala .reflect .ClassTag
22
22
23
23
import org .apache .spark .{TaskContext , Partition }
24
+ import org .apache .spark .rdd .RDD
24
25
25
- private [spark ]
26
- class SlidedRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Array [T ])
26
+ private [mllib ]
27
+ class SlidingRDDPartition [T ](val idx : Int , val prev : Partition , val tail : Seq [T ])
27
28
extends Partition with Serializable {
28
29
override val index : Int = idx
29
30
}
@@ -33,49 +34,50 @@ class SlidedRDDPartition[T](val idx: Int, val prev: Partition, val tail: Array[T
33
34
* window over them. The ordering is first based on the partition index and then the ordering of
34
35
* items within each partition. This is similar to sliding in Scala collections, except that it
35
36
* becomes an empty RDD if the window size is greater than the total number of items. It needs to
36
- * trigger a Spark job if the parent RDD has more than one partitions.
37
+ * trigger a Spark job if the parent RDD has more than one partitions. To make this operation
38
+ * efficient, the number of items per partition should be larger than the window size and the
39
+ * window size should be small, e.g., 2.
37
40
*
38
41
* @param parent the parent RDD
39
42
* @param windowSize the window size, must be greater than 1
40
43
*
41
- * @see [[org.apache.spark.rdd.RDD #sliding ]]
44
+ * @see [[org.apache.spark.mllib. rdd.RDDFunctions #sliding ]]
42
45
*/
43
- private [spark ]
44
- class SlidedRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
45
- extends RDD [Array [T ]](parent) {
46
+ private [mllib ]
47
+ class SlidingRDD [T : ClassTag ](@ transient val parent : RDD [T ], val windowSize : Int )
48
+ extends RDD [Seq [T ]](parent) {
46
49
47
- require(windowSize > 1 , " Window size must be greater than 1." )
50
+ require(windowSize > 1 , s " Window size must be greater than 1, but got $windowSize . " )
48
51
49
- override def compute (split : Partition , context : TaskContext ): Iterator [Array [T ]] = {
50
- val part = split.asInstanceOf [SlidedRDDPartition [T ]]
52
+ override def compute (split : Partition , context : TaskContext ): Iterator [Seq [T ]] = {
53
+ val part = split.asInstanceOf [SlidingRDDPartition [T ]]
51
54
(firstParent[T ].iterator(part.prev, context) ++ part.tail)
52
55
.sliding(windowSize)
53
- .map(_.toArray)
54
- .filter(_.size == windowSize)
56
+ .withPartial(false )
55
57
}
56
58
57
59
override def getPreferredLocations (split : Partition ): Seq [String ] =
58
- firstParent[T ].preferredLocations(split.asInstanceOf [SlidedRDDPartition [T ]].prev)
60
+ firstParent[T ].preferredLocations(split.asInstanceOf [SlidingRDDPartition [T ]].prev)
59
61
60
62
override def getPartitions : Array [Partition ] = {
61
63
val parentPartitions = parent.partitions
62
64
val n = parentPartitions.size
63
65
if (n == 0 ) {
64
66
Array .empty
65
67
} else if (n == 1 ) {
66
- Array (new SlidedRDDPartition [T ](0 , parentPartitions(0 ), Array .empty))
68
+ Array (new SlidingRDDPartition [T ](0 , parentPartitions(0 ), Seq .empty))
67
69
} else {
68
70
val n1 = n - 1
69
71
val w1 = windowSize - 1
70
72
// Get the first w1 items of each partition, starting from the second partition.
71
73
val nextHeads =
72
74
parent.context.runJob(parent, (iter : Iterator [T ]) => iter.take(w1).toArray, 1 until n, true )
73
- val partitions = mutable.ArrayBuffer [SlidedRDDPartition [T ]]()
75
+ val partitions = mutable.ArrayBuffer [SlidingRDDPartition [T ]]()
74
76
var i = 0
75
77
var partitionIndex = 0
76
78
while (i < n1) {
77
79
var j = i
78
- val tail = mutable.ArrayBuffer [T ]()
80
+ val tail = mutable.ListBuffer [T ]()
79
81
// Keep appending to the current tail until appended a head of size w1.
80
82
while (j < n1 && nextHeads(j).size < w1) {
81
83
tail ++= nextHeads(j)
@@ -85,14 +87,14 @@ class SlidedRDD[T: ClassTag](@transient val parent: RDD[T], val windowSize: Int)
85
87
tail ++= nextHeads(j)
86
88
j += 1
87
89
}
88
- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(i), tail.toArray )
90
+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(i), tail)
89
91
partitionIndex += 1
90
92
// Skip appended heads.
91
93
i = j
92
94
}
93
95
// If the head of last partition has size w1, we also need to add this partition.
94
- if (nextHeads(n1 - 1 ) .size == w1) {
95
- partitions += new SlidedRDDPartition [T ](partitionIndex, parentPartitions(n1), Array .empty)
96
+ if (nextHeads.last .size == w1) {
97
+ partitions += new SlidingRDDPartition [T ](partitionIndex, parentPartitions(n1), Seq .empty)
96
98
}
97
99
partitions.toArray
98
100
}
0 commit comments