Skip to content

Commit cb2a4fc

Browse files
author
Feynman Liang
committed
Inline code for readability
1 parent 01c9ae9 commit cb2a4fc

File tree

1 file changed

+25
-39
lines changed

1 file changed

+25
-39
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 25 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,31 @@ class PrefixSpan private (
9797
if (sequences.getStorageLevel == StorageLevel.NONE) {
9898
logWarning("Input data is not cached.")
9999
}
100-
val minCount = getMinCount(sequences)
101-
val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences)
102-
val prefixSuffixPairs = getPrefixSuffixPairs(
103-
lengthOnePatternsAndCounts.map(_._1).collect(), sequences)
100+
101+
// Convert min support to a min number of transactions for this dataset
102+
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong
103+
104+
val itemCounts = sequences
105+
.flatMap(_.distinct.map((_, 1L)))
106+
.reduceByKey(_ + _)
107+
.filter(_._2 >= minCount)
108+
109+
val prefixSuffixPairs = {
110+
val frequentItems = itemCounts.map(_._1).collect()
111+
val candidates = sequences.map { p =>
112+
p.filter (frequentItems.contains(_) )
113+
}
114+
candidates.flatMap { x =>
115+
frequentItems.map { y =>
116+
val sub = LocalPrefixSpan.getSuffix(y, x)
117+
(ArrayBuffer(y), sub)
118+
}.filter(_._2.nonEmpty)
119+
}
120+
}
104121
prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK)
105-
var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2))
106-
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) =
107-
splitPrefixSuffixPairs(prefixSuffixPairs)
122+
123+
var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2))
124+
var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs)
108125
while (largePrefixSuffixPairs.count() != 0) {
109126
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
110127
getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
@@ -115,6 +132,7 @@ class PrefixSpan private (
115132
smallPrefixSuffixPairs ++= smallerPairsPart
116133
allPatternAndCounts ++= nextPatternAndCounts
117134
}
135+
118136
if (smallPrefixSuffixPairs.count() > 0) {
119137
val projectedDatabase = smallPrefixSuffixPairs
120138
.map(x => (x._1.toSeq, x._2))
@@ -189,29 +207,6 @@ class PrefixSpan private (
189207
(patternAndCounts, nextPrefixSuffixPairs)
190208
}
191209

192-
/**
193-
* Get the minimum count (sequences count * minSupport).
194-
* @param sequences input data set, contains a set of sequences,
195-
* @return minimum count,
196-
*/
197-
private def getMinCount(sequences: RDD[Array[Int]]): Long = {
198-
if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong
199-
}
200-
201-
/**
202-
* Generates frequent items by filtering the input data using minimal count level.
203-
* @param minCount the absolute minimum count
204-
* @param sequences original sequences data
205-
* @return array of item and count pair
206-
*/
207-
private def getFreqItemAndCounts(
208-
minCount: Long,
209-
sequences: RDD[Array[Int]]): RDD[(Int, Long)] = {
210-
sequences.flatMap(_.distinct.map((_, 1L)))
211-
.reduceByKey(_ + _)
212-
.filter(_._2 >= minCount)
213-
}
214-
215210
/**
216211
* Get the frequent prefixes and suffix pairs.
217212
* @param frequentPrefixes frequent prefixes
@@ -221,15 +216,6 @@ class PrefixSpan private (
221216
private def getPrefixSuffixPairs(
222217
frequentPrefixes: Array[Int],
223218
sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = {
224-
val filteredSequences = sequences.map { p =>
225-
p.filter (frequentPrefixes.contains(_) )
226-
}
227-
filteredSequences.flatMap { x =>
228-
frequentPrefixes.map { y =>
229-
val sub = LocalPrefixSpan.getSuffix(y, x)
230-
(ArrayBuffer(y), sub)
231-
}.filter(_._2.nonEmpty)
232-
}
233219
}
234220

235221
/**

0 commit comments

Comments
 (0)