Skip to content

Commit da0091b

Browse files
author
Feynman Liang
committed
Use lists for prefixes to reuse data
1 parent cb2a4fc commit da0091b

File tree

1 file changed

+18
-30
lines changed

1 file changed

+18
-30
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,10 @@ class PrefixSpan private (
102102
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong
103103

104104
val itemCounts = sequences
105-
.flatMap(_.distinct.map((_, 1L)))
105+
.flatMap(seq => seq.distinct.map(item => (item, 1L)))
106106
.reduceByKey(_ + _)
107107
.filter(_._2 >= minCount)
108+
var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2))
108109

109110
val prefixSuffixPairs = {
110111
val frequentItems = itemCounts.map(_._1).collect()
@@ -114,14 +115,12 @@ class PrefixSpan private (
114115
candidates.flatMap { x =>
115116
frequentItems.map { y =>
116117
val sub = LocalPrefixSpan.getSuffix(y, x)
117-
(ArrayBuffer(y), sub)
118+
(List(y), sub)
118119
}.filter(_._2.nonEmpty)
119120
}
120121
}
121-
prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
122-
123-
var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2))
124122
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs)
123+
125124
while (largePrefixSuffixPairs.count() != 0) {
126125
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
127126
getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
@@ -135,9 +134,9 @@ class PrefixSpan private (
135134

136135
if (smallPrefixSuffixPairs.count() > 0) {
137136
val projectedDatabase = smallPrefixSuffixPairs
138-
.map(x => (x._1.toSeq, x._2))
137+
// TODO aggregateByKey
139138
.groupByKey()
140-
.map(x => (x._1.toArray, x._2.toArray))
139+
.mapValues(_.toArray)
141140
val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
142141
allPatternAndCounts ++= nextPatternAndCounts
143142
}
@@ -154,8 +153,8 @@ class PrefixSpan private (
154153
* (RDD[prefix, suffix], RDD[prefix, suffix ])
155154
*/
156155
private def splitPrefixSuffixPairs(
157-
prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]):
158-
(RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = {
156+
prefixSuffixPairs: RDD[(List[Int], Array[Int])]):
157+
(RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
159158
val prefixToSuffixSize = prefixSuffixPairs
160159
.aggregateByKey(0)(
161160
seqOp = { case (count, suffix) => count + suffix.length },
@@ -179,14 +178,14 @@ class PrefixSpan private (
179178
*/
180179
private def getPatternCountsAndPrefixSuffixPairs(
181180
minCount: Long,
182-
prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]):
183-
(RDD[(ArrayBuffer[Int], Long)], RDD[(ArrayBuffer[Int], Array[Int])]) = {
181+
prefixSuffixPairs: RDD[(List[Int], Array[Int])]):
182+
(RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {
184183
val prefixAndFrequentItemAndCounts = prefixSuffixPairs
185184
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
186185
.reduceByKey(_ + _)
187186
.filter(_._2 >= minCount)
188187
val patternAndCounts = prefixAndFrequentItemAndCounts
189-
.map { case ((prefix, item), count) => (prefix :+ item, count) }
188+
.map { case ((prefix, item), count) => (item :: prefix, count) }
190189
val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts
191190
.keys
192191
.groupByKey()
@@ -201,23 +200,12 @@ class PrefixSpan private (
201200
frequentNextItems.flatMap { item =>
202201
val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix)
203202
if (suffix.isEmpty) None
204-
else Some(prefix :+ item, suffix)
203+
else Some(item :: prefix, suffix)
205204
}
206205
}
207206
(patternAndCounts, nextPrefixSuffixPairs)
208207
}
209208

210-
/**
211-
* Get the frequent prefixes and suffix pairs.
212-
* @param frequentPrefixes frequent prefixes
213-
* @param sequences sequences data
214-
* @return prefixes and suffix pairs.
215-
*/
216-
private def getPrefixSuffixPairs(
217-
frequentPrefixes: Array[Int],
218-
sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = {
219-
}
220-
221209
/**
222210
* calculate the patterns in local.
223211
* @param minCount the absolute minimum count
@@ -226,13 +214,13 @@ class PrefixSpan private (
226214
*/
227215
private def getPatternsInLocal(
228216
minCount: Long,
229-
data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = {
217+
data: RDD[(List[Int], Array[Array[Int]])]): RDD[(List[Int], Long)] = {
230218
data.flatMap {
231-
case (prefix, projDB) =>
232-
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
233-
.map { case (pattern: List[Int], count: Long) =>
234-
(pattern.toArray.reverse.to[ArrayBuffer], count)
235-
}
219+
case (prefix, projDB) =>
220+
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
221+
.map { case (pattern: List[Int], count: Long) =>
222+
(pattern.reverse, count)
223+
}
236224
}
237225
}
238226
}

0 commit comments

Comments
 (0)