diff --git a/.gitignore b/.gitignore index bb3af66..1903ccd 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ runtime *.war *.ear KPIDomain.java - +*.train +*.validate diff --git a/logfile/netflix/smallnetflix_mm.train.zip b/logfile/netflix/smallnetflix_mm.train.zip new file mode 100644 index 0000000..8ffb485 Binary files /dev/null and b/logfile/netflix/smallnetflix_mm.train.zip differ diff --git a/logfile/netflix/smallnetflix_mm.validate.zip b/logfile/netflix/smallnetflix_mm.validate.zip new file mode 100644 index 0000000..49ae0ad Binary files /dev/null and b/logfile/netflix/smallnetflix_mm.validate.zip differ diff --git a/logfile/small.csv b/logfile/small.csv new file mode 100644 index 0000000..30a25af --- /dev/null +++ b/logfile/small.csv @@ -0,0 +1,21 @@ +1,101,5.0 +1,102,3.0 +1,103,2.5 +2,101,2.0 +2,102,2.5 +2,103,5.0 +2,104,2.0 +3,101,2.0 +3,104,4.0 +3,105,4.5 +3,107,5.0 +4,101,5.0 +4,103,3.0 +4,104,4.5 +4,106,4.0 +5,101,4.0 +5,102,3.0 +5,103,2.0 +5,104,4.0 +5,105,3.5 +5,106,4.0 \ No newline at end of file diff --git a/src/main/java/org/conan/myhadoop/hdfs/HdfsDAO.java b/src/main/java/org/conan/myhadoop/hdfs/HdfsDAO.java new file mode 100644 index 0000000..d4aeba1 --- /dev/null +++ b/src/main/java/org/conan/myhadoop/hdfs/HdfsDAO.java @@ -0,0 +1,141 @@ +package org.conan.myhadoop.hdfs; + + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.mapred.JobConf; + +public class HdfsDAO { + + private static final String HDFS = "hdfs://192.168.1.210:9000/"; + + public HdfsDAO(Configuration conf) { + this(HDFS, conf); + } + + public HdfsDAO(String hdfs, Configuration conf) { + this.hdfsPath = hdfs; + this.conf = conf; + } + + private String hdfsPath; + private Configuration conf; + + public static void main(String[] args) throws IOException { + JobConf conf = config(); + HdfsDAO hdfs = new HdfsDAO(conf); + hdfs.copyFile("datafile/item.csv", "/tmp/new"); + hdfs.ls("/tmp/new"); + } + + public static JobConf config(){ + JobConf conf = new JobConf(HdfsDAO.class); + conf.setJobName("HdfsDAO"); + conf.addResource("classpath:/hadoop/core-site.xml"); + conf.addResource("classpath:/hadoop/hdfs-site.xml"); + conf.addResource("classpath:/hadoop/mapred-site.xml"); + return conf; + } + + public void mkdirs(String folder) throws IOException { + Path path = new Path(folder); + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + if (!fs.exists(path)) { + fs.mkdirs(path); + System.out.println("Create: " + folder); + } + fs.close(); + } + + public void rmr(String folder) throws IOException { + Path path = new Path(folder); + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + fs.deleteOnExit(path); + System.out.println("Delete: " + folder); + fs.close(); + } + + public void ls(String folder) throws IOException { + Path path = new Path(folder); + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + FileStatus[] list = fs.listStatus(path); + System.out.println("ls: " + folder); + System.out.println("=========================================================="); + for (FileStatus f : list) { + System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); + } + System.out.println("=========================================================="); + fs.close(); + } + + public void createFile(String file, String content) throws IOException { + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + byte[] buff = content.getBytes(); + FSDataOutputStream os = null; + try { + os = fs.create(new Path(file)); + os.write(buff, 0, buff.length); + System.out.println("Create: " + file); + } finally { + if (os != null) + os.close(); + } + fs.close(); + } + + public void copyFile(String local, String remote) throws IOException { + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + fs.copyFromLocalFile(new Path(local), new Path(remote)); + System.out.println("copy from: " + local + " to " + remote); + fs.close(); + } + + public void download(String remote, String local) throws IOException { + Path path = new Path(remote); + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + fs.copyToLocalFile(path, new Path(local)); + System.out.println("download: from" + remote + " to " + local); + fs.close(); + } + + public void cat(String remoteFile) throws IOException { + Path path = new Path(remoteFile); + FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); + FSDataInputStream fsdis = null; + System.out.println("cat: " + remoteFile); + try { + fsdis =fs.open(path); + IOUtils.copyBytes(fsdis, System.out, 4096, false); + } finally { + IOUtils.closeStream(fsdis); + fs.close(); + } + } + + public void location() throws IOException { + // String folder = hdfsPath + "create/"; + // String file = "t2.txt"; + // FileSystem fs = FileSystem.get(URI.create(hdfsPath), new + // Configuration()); + // FileStatus f = fs.getFileStatus(new Path(folder + file)); + // BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); + // + // System.out.println("File Location: " + folder + file); + // for (BlockLocation bl : list) { + // String[] hosts = bl.getHosts(); + // for (String host : hosts) { + // System.out.println("host:" + host); + // } + // } + // fs.close(); + } + +} diff --git a/src/main/java/org/conan/myhadoop/recommand/Recommand.java b/src/main/java/org/conan/myhadoop/recommand/Recommand.java new file mode 100644 index 0000000..1e2a562 --- /dev/null +++ b/src/main/java/org/conan/myhadoop/recommand/Recommand.java @@ -0,0 +1,46 @@ +package org.conan.myhadoop.recommand; + +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.hadoop.mapred.JobConf; + +public class Recommand { + + public static final String HDFS = "hdfs://192.168.1.210:9000"; + public static final Pattern DELIMITER = Pattern.compile("[\t,]"); + + public static void main(String[] args) throws Exception { + Map path = new HashMap(); + path.put("data", "logfile/small.csv"); + path.put("Step1Input", HDFS + "/user/hdfs/recommand"); + path.put("Step1Output", path.get("Step1Input") + "/step1"); + path.put("Step2Input", path.get("Step1Output")); + path.put("Step2Output", path.get("Step1Input") + "/step2"); + path.put("Step3Input1", path.get("Step1Output")); + path.put("Step3Output1", path.get("Step1Input") + "/step3_1"); + path.put("Step3Input2", path.get("Step2Output")); + path.put("Step3Output2", path.get("Step1Input") + "/step3_2"); + path.put("Step4Input1", path.get("Step3Output1")); + path.put("Step4Input2", path.get("Step3Output2")); + path.put("Step4Output", path.get("Step1Input") + "/step4"); + + Step1.run(path); + Step2.run(path); + Step3.run1(path); + Step3.run2(path); + Step4.run(path); + System.exit(0); + } + + public static JobConf config() { + JobConf conf = new JobConf(Recommand.class); + conf.setJobName("Recommand"); + conf.addResource("classpath:/hadoop/core-site.xml"); + conf.addResource("classpath:/hadoop/hdfs-site.xml"); + conf.addResource("classpath:/hadoop/mapred-site.xml"); + return conf; + } + +} diff --git a/src/main/java/org/conan/myhadoop/recommand/Step1.java b/src/main/java/org/conan/myhadoop/recommand/Step1.java new file mode 100644 index 0000000..0590577 --- /dev/null +++ b/src/main/java/org/conan/myhadoop/recommand/Step1.java @@ -0,0 +1,89 @@ +package org.conan.myhadoop.recommand; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.conan.myhadoop.hdfs.HdfsDAO; + +public class Step1 { + + public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper { + private final static IntWritable k = new IntWritable(); + private final static Text v = new Text(); + + @Override + public void map(Object key, Text value, OutputCollector output, Reporter reporter) throws IOException { + String[] tokens = Recommand.DELIMITER.split(value.toString()); + int userID = Integer.parseInt(tokens[0]); + String itemID = tokens[1]; + String pref = tokens[2]; + k.set(userID); + v.set(itemID + ":" + pref); + output.collect(k, v); + } + } + + public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer { + private final static Text v = new Text(); + + @Override + public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + StringBuilder sb = new StringBuilder(); + while (values.hasNext()) { + sb.append("," + values.next()); + } + v.set(sb.toString().replaceFirst(",", "")); + output.collect(key, v); + } + } + + public static void run(Map path) throws IOException { + JobConf conf = Recommand.config(); + + String input = path.get("Step1Input"); + String output = path.get("Step1Output"); + + HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); + hdfs.rmr(input); + hdfs.mkdirs(input); + hdfs.copyFile(path.get("data"), input); + + conf.setMapOutputKeyClass(IntWritable.class); + conf.setMapOutputValueClass(Text.class); + + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(Text.class); + + conf.setMapperClass(Step1_ToItemPreMapper.class); + conf.setCombinerClass(Step1_ToUserVectorReducer.class); + conf.setReducerClass(Step1_ToUserVectorReducer.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + RunningJob job = JobClient.runJob(conf); + while (!job.isComplete()) { + job.waitForCompletion(); + } + } + +} diff --git a/src/main/java/org/conan/myhadoop/recommand/Step2.java b/src/main/java/org/conan/myhadoop/recommand/Step2.java new file mode 100644 index 0000000..c72e207 --- /dev/null +++ b/src/main/java/org/conan/myhadoop/recommand/Step2.java @@ -0,0 +1,85 @@ +package org.conan.myhadoop.recommand; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.conan.myhadoop.hdfs.HdfsDAO; + +public class Step2 { + public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper { + private final static Text k = new Text(); + private final static IntWritable v = new IntWritable(1); + + @Override + public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException { + String[] tokens = Recommand.DELIMITER.split(values.toString()); + for (int i = 1; i < tokens.length; i++) { + String itemID = tokens[i].split(":")[0]; + for (int j = 1; j < tokens.length; j++) { + String itemID2 = tokens[j].split(":")[0]; + k.set(itemID + ":" + itemID2); + output.collect(k, v); + } + } + } + } + + public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer { + private IntWritable result = new IntWritable(); + + @Override + public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + result.set(sum); + output.collect(key, result); + } + } + + public static void run(Map path) throws IOException { + JobConf conf = Recommand.config(); + + String input = path.get("Step2Input"); + String output = path.get("Step2Output"); + + HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); + hdfs.rmr(output); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); + conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class); + conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + RunningJob job = JobClient.runJob(conf); + while (!job.isComplete()) { + job.waitForCompletion(); + } + } +} diff --git a/src/main/java/org/conan/myhadoop/recommand/Step3.java b/src/main/java/org/conan/myhadoop/recommand/Step3.java new file mode 100644 index 0000000..4036dc2 --- /dev/null +++ b/src/main/java/org/conan/myhadoop/recommand/Step3.java @@ -0,0 +1,109 @@ +package org.conan.myhadoop.recommand; + +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.conan.myhadoop.hdfs.HdfsDAO; + +public class Step3 { + + public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper { + private final static IntWritable k = new IntWritable(); + private final static Text v = new Text(); + + @Override + public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException { + String[] tokens = Recommand.DELIMITER.split(values.toString()); + for (int i = 1; i < tokens.length; i++) { + String[] vector = tokens[i].split(":"); + int itemID = Integer.parseInt(vector[0]); + String pref = vector[1]; + + k.set(itemID); + v.set(tokens[0] + ":" + pref); + output.collect(k, v); + } + } + } + + public static void run1(Map path) throws IOException { + JobConf conf = Recommand.config(); + + String input = path.get("Step3Input1"); + String output = path.get("Step3Output1"); + + HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); + hdfs.rmr(output); + + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(Text.class); + + conf.setMapperClass(Step31_UserVectorSplitterMapper.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + RunningJob job = JobClient.runJob(conf); + while (!job.isComplete()) { + job.waitForCompletion(); + } + } + + public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper { + private final static Text k = new Text(); + private final static IntWritable v = new IntWritable(); + + @Override + public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException { + String[] tokens = Recommand.DELIMITER.split(values.toString()); + k.set(tokens[0]); + v.set(Integer.parseInt(tokens[1])); + output.collect(k, v); + } + } + + public static void run2(Map path) throws IOException { + JobConf conf = Recommand.config(); + + String input = path.get("Step3Input2"); + String output = path.get("Step3Output2"); + + HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); + hdfs.rmr(output); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(input)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + RunningJob job = JobClient.runJob(conf); + while (!job.isComplete()) { + job.waitForCompletion(); + } + } + +} diff --git a/src/main/java/org/conan/myhadoop/recommand/Step4.java b/src/main/java/org/conan/myhadoop/recommand/Step4.java new file mode 100644 index 0000000..2ce99dc --- /dev/null +++ b/src/main/java/org/conan/myhadoop/recommand/Step4.java @@ -0,0 +1,163 @@ +package org.conan.myhadoop.recommand; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; +import org.conan.myhadoop.hdfs.HdfsDAO; + +public class Step4 { + + public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper { + private final static IntWritable k = new IntWritable(); + private final static Text v = new Text(); + + private final static Map> cooccurrenceMatrix = new HashMap>(); + + @Override + public void map(LongWritable key, Text values, OutputCollector output, Reporter reporter) throws IOException { + String[] tokens = Recommand.DELIMITER.split(values.toString()); + + String[] v1 = tokens[0].split(":"); + String[] v2 = tokens[1].split(":"); + + if (v1.length > 1) {// cooccurrence + int itemID1 = Integer.parseInt(v1[0]); + int itemID2 = Integer.parseInt(v1[1]); + int num = Integer.parseInt(tokens[1]); + + List list = null; + if (!cooccurrenceMatrix.containsKey(itemID1)) { + list = new ArrayList(); + } else { + list = cooccurrenceMatrix.get(itemID1); + } + list.add(new Cooccurrence(itemID1, itemID2, num)); + cooccurrenceMatrix.put(itemID1, list); + } + + if (v2.length > 1) {// userVector + int itemID = Integer.parseInt(tokens[0]); + int userID = Integer.parseInt(v2[0]); + double pref = Double.parseDouble(v2[1]); + k.set(userID); + for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) { + v.set(co.getItemID2() + "," + pref * co.getNum()); + output.collect(k, v); + } + + } + } + } + + public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer { + private final static Text v = new Text(); + + @Override + public void reduce(IntWritable key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { + Map result = new HashMap(); + while (values.hasNext()) { + String[] str = values.next().toString().split(","); + if (result.containsKey(str[0])) { + result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1])); + } else { + result.put(str[0], Double.parseDouble(str[1])); + } + } + Iterator iter = result.keySet().iterator(); + while (iter.hasNext()) { + String itemID = iter.next(); + double score = result.get(itemID); + v.set(itemID + "," + score); + output.collect(key, v); + } + } + } + + public static void run(Map path) throws IOException { + JobConf conf = Recommand.config(); + + String input1 = path.get("Step4Input1"); + String input2 = path.get("Step4Input2"); + String output = path.get("Step4Output"); + + HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); + hdfs.rmr(output); + + conf.setOutputKeyClass(IntWritable.class); + conf.setOutputValueClass(Text.class); + + conf.setMapperClass(Step4_PartialMultiplyMapper.class); + conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class); + conf.setReducerClass(Step4_AggregateAndRecommendReducer.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2)); + FileOutputFormat.setOutputPath(conf, new Path(output)); + + RunningJob job = JobClient.runJob(conf); + while (!job.isComplete()) { + job.waitForCompletion(); + } + } + +} + +class Cooccurrence { + private int itemID1; + private int itemID2; + private int num; + + public Cooccurrence(int itemID1, int itemID2, int num) { + super(); + this.itemID1 = itemID1; + this.itemID2 = itemID2; + this.num = num; + } + + public int getItemID1() { + return itemID1; + } + + public void setItemID1(int itemID1) { + this.itemID1 = itemID1; + } + + public int getItemID2() { + return itemID2; + } + + public void setItemID2(int itemID2) { + this.itemID2 = itemID2; + } + + public int getNum() { + return num; + } + + public void setNum(int num) { + this.num = num; + } + +}