Skip to content

Commit

Permalink
[SPARK-5939][MLLib] make FPGrowth example app take parameters
Browse files Browse the repository at this point in the history
Add parameter parsing in FPGrowth example app in Scala and Java
And a sample data file is added in data/mllib folder

Author: Jacky Li <jacky.likun@huawei.com>

Closes #4714 from jackylk/parameter and squashes the following commits:

8c478b3 [Jacky Li] fix according to comments
3bb74f6 [Jacky Li] make FPGrowth exampl app take parameters
f0e4d10 [Jacky Li] make FPGrowth exampl app take parameters
  • Loading branch information
jackylk authored and mengxr committed Feb 23, 2015
1 parent 242d495 commit 651a1c0
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 27 deletions.
6 changes: 6 additions & 0 deletions data/mllib/sample_fpgrowth.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
r z h k p
z y x w v u t s
s x o n r
x z y m t s q e
z
x z y r q t p
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,49 @@
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;

/**
* Java example for mining frequent itemsets using FP-growth.
* Example usage: ./bin/run-example mllib.JavaFPGrowthExample ./data/mllib/sample_fpgrowth.txt
*/
public class JavaFPGrowthExample {

public static void main(String[] args) {
String inputFile;
double minSupport = 0.3;
int numPartition = -1;
if (args.length < 1) {
System.err.println(
"Usage: JavaFPGrowth <input_file> [minSupport] [numPartition]");
System.exit(1);
}
inputFile = args[0];
if (args.length >= 2) {
minSupport = Double.parseDouble(args[1]);
}
if (args.length >= 3) {
numPartition = Integer.parseInt(args[2]);
}

SparkConf sparkConf = new SparkConf().setAppName("JavaFPGrowthExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

JavaRDD<ArrayList<String>> transactions = sc.textFile(inputFile).map(
new Function<String, ArrayList<String>>() {
@Override
public ArrayList<String> call(String s) {
return Lists.newArrayList(s.split(" "));
}
}
);

// TODO: Read a user-specified input file.
@SuppressWarnings("unchecked")
JavaRDD<ArrayList<String>> transactions = sc.parallelize(Lists.newArrayList(
Lists.newArrayList("r z h k p".split(" ")),
Lists.newArrayList("z y x w v u t s".split(" ")),
Lists.newArrayList("s x o n r".split(" ")),
Lists.newArrayList("x z y m t s q e".split(" ")),
Lists.newArrayList("z".split(" ")),
Lists.newArrayList("x z y r q t p".split(" "))), 2);

FPGrowth fpg = new FPGrowth()
.setMinSupport(0.3);
FPGrowthModel<String> model = fpg.run(transactions);
FPGrowthModel<String> model = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartition)
.run(transactions);

for (FPGrowth.FreqItemset<String> s: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + Joiner.on(",").join(s.javaItems()) + "], " + s.freq());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,61 @@

package org.apache.spark.examples.mllib

import scopt.OptionParser

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.{SparkConf, SparkContext}

/**
* Example for mining frequent itemsets using FP-growth.
* Example usage: ./bin/run-example mllib.FPGrowthExample \
* --minSupport 0.8 --numPartition 2 ./data/mllib/sample_fpgrowth.txt
*/
object FPGrowthExample {

case class Params(
input: String = null,
minSupport: Double = 0.3,
numPartition: Int = -1) extends AbstractParams[Params]

def main(args: Array[String]) {
val conf = new SparkConf().setAppName("FPGrowthExample")
val defaultParams = Params()

val parser = new OptionParser[Params]("FPGrowthExample") {
head("FPGrowth: an example FP-growth app.")
opt[Double]("minSupport")
.text(s"minimal support level, default: ${defaultParams.minSupport}")
.action((x, c) => c.copy(minSupport = x))
opt[Int]("numPartition")
.text(s"number of partition, default: ${defaultParams.numPartition}")
.action((x, c) => c.copy(numPartition = x))
arg[String]("<input>")
.text("input paths to input data set, whose file format is that each line " +
"contains a transaction with each item in String and separated by a space")
.required()
.action((x, c) => c.copy(input = x))
}

parser.parse(args, defaultParams).map { params =>
run(params)
}.getOrElse {
sys.exit(1)
}
}

def run(params: Params) {
val conf = new SparkConf().setAppName(s"FPGrowthExample with $params")
val sc = new SparkContext(conf)
val transactions = sc.textFile(params.input).map(_.split(" ")).cache()

println(s"Number of transactions: ${transactions.count()}")

val model = new FPGrowth()
.setMinSupport(params.minSupport)
.setNumPartitions(params.numPartition)
.run(transactions)

// TODO: Read a user-specified input file.
val transactions = sc.parallelize(Seq(
"r z h k p",
"z y x w v u t s",
"s x o n r",
"x z y m t s q e",
"z",
"x z y r q t p").map(_.split(" ")), numSlices = 2)

val fpg = new FPGrowth()
.setMinSupport(0.3)
val model = fpg.run(transactions)
println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")

model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
Expand Down

0 comments on commit 651a1c0

Please sign in to comment.