Skip to content

Commit

Permalink
Step4_update
Browse files Browse the repository at this point in the history
  • Loading branch information
bsspirit committed Mar 28, 2014
1 parent 4137b66 commit 4c3d781
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 4 deletions.
21 changes: 21 additions & 0 deletions src/main/java/org/conan/myhadoop/recommend/Recommend.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Recommend {

Expand All @@ -22,15 +23,35 @@ public static void main(String[] args) throws Exception {
path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
path.put("Step3Input2", path.get("Step2Output"));
path.put("Step3Output2", path.get("Step1Input") + "/step3_2");

path.put("Step4Input1", path.get("Step3Output1"));
path.put("Step4Input2", path.get("Step3Output2"));
path.put("Step4Output", path.get("Step1Input") + "/step4");

path.put("Step5Input1", path.get("Step3Output1"));
path.put("Step5Input2", path.get("Step3Output2"));
path.put("Step5Output", path.get("Step1Input") + "/step5");

path.put("Step6Input", path.get("Step5Output"));
path.put("Step6Output", path.get("Step1Input") + "/step6");



Step1.run(path);
Step2.run(path);
Step3.run1(path);
Step3.run2(path);
Step4.run(path);

Step4_Update.run(path);
Step4_Update2.run(path);


// // hadoop fs -cat /user/hdfs/recommend/step4/part-00000
// JobConf conf = config();
// HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
// hdfs.cat("/user/hdfs/recommend/step4/part-00000");

System.exit(0);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/conan/myhadoop/recommend/Step2.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static void run(Map<String, String> path) throws IOException {
conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
// conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
// conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);

conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/conan/myhadoop/recommend/Step4.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public static class Step4_PartialMultiplyMapper extends MapReduceBase implements
@Override
public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
String[] tokens = Recommend.DELIMITER.split(values.toString());

String[] v1 = tokens[0].split(":");
String[] v2 = tokens[1].split(":");

Expand Down Expand Up @@ -114,7 +114,7 @@ public static void run(Map<String, String> path) throws IOException {

FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
FileOutputFormat.setOutputPath(conf, new Path(output));

RunningJob job = JobClient.runJob(conf);
while (!job.isComplete()) {
job.waitForCompletion();
Expand Down
141 changes: 141 additions & 0 deletions src/main/java/org/conan/myhadoop/recommend/Step4_Update.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step4_Update {

public static class Step4_PartialMultiplyMapper extends Mapper<LongWritable, Text, Text, Text> {

private String flag;// A同现矩阵 or B评分矩阵

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集

// System.out.println(flag);
}

@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.DELIMITER.split(values.toString());

if (flag.equals("step3_2")) {// 同现矩阵
String[] v1 = tokens[0].split(":");
String itemID1 = v1[0];
String itemID2 = v1[1];
String num = tokens[1];

Text k = new Text(itemID1);
Text v = new Text("A:" + itemID2 + "," + num);

context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());

} else if (flag.equals("step3_1")) {// 评分矩阵
String[] v2 = tokens[1].split(":");
String itemID = tokens[0];
String userID = v2[0];
String pref = v2[1];

Text k = new Text(itemID);
Text v = new Text("B:" + userID + "," + pref);

context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
}
}

}

public static class Step4_AggregateReducer extends Reducer<Text, Text, Text, Text> {

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println(key.toString() + ":");

Map<String, String> mapA = new HashMap<String, String>();
Map<String, String> mapB = new HashMap<String, String>();

for (Text line : values) {
String val = line.toString();
System.out.println(val);

if (val.startsWith("A:")) {
String[] kv = Recommend.DELIMITER.split(val.substring(2));
mapA.put(kv[0], kv[1]);

} else if (val.startsWith("B:")) {
String[] kv = Recommend.DELIMITER.split(val.substring(2));
mapB.put(kv[0], kv[1]);

}
}

double result = 0;
Iterator<String> iter = mapA.keySet().iterator();
while (iter.hasNext()) {
String mapk = iter.next();// itemID

int num = Integer.parseInt(mapA.get(mapk));
Iterator<String> iterb = mapB.keySet().iterator();
while (iterb.hasNext()) {
String mapkb = iterb.next();// userID
double pref = Double.parseDouble(mapB.get(mapkb));
result = num * pref;// 矩阵乘法相乘计算

Text k = new Text(mapkb);
Text v = new Text(mapk + "," + result);
context.write(k, v);
System.out.println(k.toString() + " " + v.toString());
}
}
}
}

public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = Recommend.config();

String input1 = path.get("Step5Input1");
String input2 = path.get("Step5Input2");
String output = path.get("Step5Output");

HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);

Job job = new Job(conf);
job.setJarByClass(Step4_Update.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(Step4_Update.Step4_PartialMultiplyMapper.class);
job.setReducerClass(Step4_Update.Step4_AggregateReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
FileOutputFormat.setOutputPath(job, new Path(output));

job.waitForCompletion(true);
}

}
91 changes: 91 additions & 0 deletions src/main/java/org/conan/myhadoop/recommend/Step4_Update2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step4_Update2 {

public static class Step4_RecommendMapper extends Mapper<LongWritable, Text, Text, Text> {

@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.DELIMITER.split(values.toString());
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1]+","+tokens[2]);
context.write(k, v);
}
}

public static class Step4_RecommendReducer extends Reducer<Text, Text, Text, Text> {

@Override
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
System.out.println(key.toString() + ":");
Map<String, Double> map = new HashMap<String, Double>();// 结果

for (Text line : values) {
System.out.println(line.toString());
String[] tokens = Recommend.DELIMITER.split(line.toString());
String itemID = tokens[0];
Double score = Double.parseDouble(tokens[1]);

if (map.containsKey(itemID)) {
map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算
} else {
map.put(itemID, score);
}
}

Iterator<String> iter = map.keySet().iterator();
while (iter.hasNext()) {
String itemID = iter.next();
double score = map.get(itemID);
Text v = new Text(itemID + "," + score);
context.write(key, v);
}
}
}

public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = Recommend.config();

String input = path.get("Step6Input");
String output = path.get("Step6Output");

HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
hdfs.rmr(output);

Job job = new Job(conf);
job.setJarByClass(Step4_Update2.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setMapperClass(Step4_Update2.Step4_RecommendMapper.class);
job.setReducerClass(Step4_Update2.Step4_RecommendReducer.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

job.waitForCompletion(true);
}

}

0 comments on commit 4c3d781

Please sign in to comment.