forked from bsspirit/maven_hadoop_template
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
656 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ runtime | |
*.war | ||
*.ear | ||
KPIDomain.java | ||
|
||
*.train | ||
*.validate | ||
|
||
|
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
1,101,5.0 | ||
1,102,3.0 | ||
1,103,2.5 | ||
2,101,2.0 | ||
2,102,2.5 | ||
2,103,5.0 | ||
2,104,2.0 | ||
3,101,2.0 | ||
3,104,4.0 | ||
3,105,4.5 | ||
3,107,5.0 | ||
4,101,5.0 | ||
4,103,3.0 | ||
4,104,4.5 | ||
4,106,4.0 | ||
5,101,4.0 | ||
5,102,3.0 | ||
5,103,2.0 | ||
5,104,4.0 | ||
5,105,3.5 | ||
5,106,4.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package org.conan.myhadoop.hdfs; | ||
|
||
|
||
import java.io.IOException; | ||
import java.net.URI; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.FSDataInputStream; | ||
import org.apache.hadoop.fs.FSDataOutputStream; | ||
import org.apache.hadoop.fs.FileStatus; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.io.IOUtils; | ||
import org.apache.hadoop.mapred.JobConf; | ||
|
||
public class HdfsDAO { | ||
|
||
private static final String HDFS = "hdfs://192.168.1.210:9000/"; | ||
|
||
public HdfsDAO(Configuration conf) { | ||
this(HDFS, conf); | ||
} | ||
|
||
public HdfsDAO(String hdfs, Configuration conf) { | ||
this.hdfsPath = hdfs; | ||
this.conf = conf; | ||
} | ||
|
||
private String hdfsPath; | ||
private Configuration conf; | ||
|
||
public static void main(String[] args) throws IOException { | ||
JobConf conf = config(); | ||
HdfsDAO hdfs = new HdfsDAO(conf); | ||
hdfs.copyFile("datafile/item.csv", "/tmp/new"); | ||
hdfs.ls("/tmp/new"); | ||
} | ||
|
||
public static JobConf config(){ | ||
JobConf conf = new JobConf(HdfsDAO.class); | ||
conf.setJobName("HdfsDAO"); | ||
conf.addResource("classpath:/hadoop/core-site.xml"); | ||
conf.addResource("classpath:/hadoop/hdfs-site.xml"); | ||
conf.addResource("classpath:/hadoop/mapred-site.xml"); | ||
return conf; | ||
} | ||
|
||
public void mkdirs(String folder) throws IOException { | ||
Path path = new Path(folder); | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
if (!fs.exists(path)) { | ||
fs.mkdirs(path); | ||
System.out.println("Create: " + folder); | ||
} | ||
fs.close(); | ||
} | ||
|
||
public void rmr(String folder) throws IOException { | ||
Path path = new Path(folder); | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
fs.deleteOnExit(path); | ||
System.out.println("Delete: " + folder); | ||
fs.close(); | ||
} | ||
|
||
public void ls(String folder) throws IOException { | ||
Path path = new Path(folder); | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
FileStatus[] list = fs.listStatus(path); | ||
System.out.println("ls: " + folder); | ||
System.out.println("=========================================================="); | ||
for (FileStatus f : list) { | ||
System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen()); | ||
} | ||
System.out.println("=========================================================="); | ||
fs.close(); | ||
} | ||
|
||
public void createFile(String file, String content) throws IOException { | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
byte[] buff = content.getBytes(); | ||
FSDataOutputStream os = null; | ||
try { | ||
os = fs.create(new Path(file)); | ||
os.write(buff, 0, buff.length); | ||
System.out.println("Create: " + file); | ||
} finally { | ||
if (os != null) | ||
os.close(); | ||
} | ||
fs.close(); | ||
} | ||
|
||
public void copyFile(String local, String remote) throws IOException { | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
fs.copyFromLocalFile(new Path(local), new Path(remote)); | ||
System.out.println("copy from: " + local + " to " + remote); | ||
fs.close(); | ||
} | ||
|
||
public void download(String remote, String local) throws IOException { | ||
Path path = new Path(remote); | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
fs.copyToLocalFile(path, new Path(local)); | ||
System.out.println("download: from" + remote + " to " + local); | ||
fs.close(); | ||
} | ||
|
||
public void cat(String remoteFile) throws IOException { | ||
Path path = new Path(remoteFile); | ||
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); | ||
FSDataInputStream fsdis = null; | ||
System.out.println("cat: " + remoteFile); | ||
try { | ||
fsdis =fs.open(path); | ||
IOUtils.copyBytes(fsdis, System.out, 4096, false); | ||
} finally { | ||
IOUtils.closeStream(fsdis); | ||
fs.close(); | ||
} | ||
} | ||
|
||
public void location() throws IOException { | ||
// String folder = hdfsPath + "create/"; | ||
// String file = "t2.txt"; | ||
// FileSystem fs = FileSystem.get(URI.create(hdfsPath), new | ||
// Configuration()); | ||
// FileStatus f = fs.getFileStatus(new Path(folder + file)); | ||
// BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); | ||
// | ||
// System.out.println("File Location: " + folder + file); | ||
// for (BlockLocation bl : list) { | ||
// String[] hosts = bl.getHosts(); | ||
// for (String host : hosts) { | ||
// System.out.println("host:" + host); | ||
// } | ||
// } | ||
// fs.close(); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package org.conan.myhadoop.recommand; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.regex.Pattern; | ||
|
||
import org.apache.hadoop.mapred.JobConf; | ||
|
||
public class Recommand { | ||
|
||
public static final String HDFS = "hdfs://192.168.1.210:9000"; | ||
public static final Pattern DELIMITER = Pattern.compile("[\t,]"); | ||
|
||
public static void main(String[] args) throws Exception { | ||
Map<String, String> path = new HashMap<String, String>(); | ||
path.put("data", "logfile/small.csv"); | ||
path.put("Step1Input", HDFS + "/user/hdfs/recommand"); | ||
path.put("Step1Output", path.get("Step1Input") + "/step1"); | ||
path.put("Step2Input", path.get("Step1Output")); | ||
path.put("Step2Output", path.get("Step1Input") + "/step2"); | ||
path.put("Step3Input1", path.get("Step1Output")); | ||
path.put("Step3Output1", path.get("Step1Input") + "/step3_1"); | ||
path.put("Step3Input2", path.get("Step2Output")); | ||
path.put("Step3Output2", path.get("Step1Input") + "/step3_2"); | ||
path.put("Step4Input1", path.get("Step3Output1")); | ||
path.put("Step4Input2", path.get("Step3Output2")); | ||
path.put("Step4Output", path.get("Step1Input") + "/step4"); | ||
|
||
Step1.run(path); | ||
Step2.run(path); | ||
Step3.run1(path); | ||
Step3.run2(path); | ||
Step4.run(path); | ||
System.exit(0); | ||
} | ||
|
||
public static JobConf config() { | ||
JobConf conf = new JobConf(Recommand.class); | ||
conf.setJobName("Recommand"); | ||
conf.addResource("classpath:/hadoop/core-site.xml"); | ||
conf.addResource("classpath:/hadoop/hdfs-site.xml"); | ||
conf.addResource("classpath:/hadoop/mapred-site.xml"); | ||
return conf; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package org.conan.myhadoop.recommand; | ||
|
||
import java.io.IOException; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
|
||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.io.IntWritable; | ||
import org.apache.hadoop.io.Text; | ||
import org.apache.hadoop.mapred.FileInputFormat; | ||
import org.apache.hadoop.mapred.FileOutputFormat; | ||
import org.apache.hadoop.mapred.JobClient; | ||
import org.apache.hadoop.mapred.JobConf; | ||
import org.apache.hadoop.mapred.MapReduceBase; | ||
import org.apache.hadoop.mapred.Mapper; | ||
import org.apache.hadoop.mapred.OutputCollector; | ||
import org.apache.hadoop.mapred.Reducer; | ||
import org.apache.hadoop.mapred.Reporter; | ||
import org.apache.hadoop.mapred.RunningJob; | ||
import org.apache.hadoop.mapred.TextInputFormat; | ||
import org.apache.hadoop.mapred.TextOutputFormat; | ||
import org.conan.myhadoop.hdfs.HdfsDAO; | ||
|
||
public class Step1 { | ||
|
||
public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> { | ||
private final static IntWritable k = new IntWritable(); | ||
private final static Text v = new Text(); | ||
|
||
@Override | ||
public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { | ||
String[] tokens = Recommand.DELIMITER.split(value.toString()); | ||
int userID = Integer.parseInt(tokens[0]); | ||
String itemID = tokens[1]; | ||
String pref = tokens[2]; | ||
k.set(userID); | ||
v.set(itemID + ":" + pref); | ||
output.collect(k, v); | ||
} | ||
} | ||
|
||
public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> { | ||
private final static Text v = new Text(); | ||
|
||
@Override | ||
public void reduce(IntWritable key, Iterator<Text> values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { | ||
StringBuilder sb = new StringBuilder(); | ||
while (values.hasNext()) { | ||
sb.append("," + values.next()); | ||
} | ||
v.set(sb.toString().replaceFirst(",", "")); | ||
output.collect(key, v); | ||
} | ||
} | ||
|
||
public static void run(Map<String, String> path) throws IOException { | ||
JobConf conf = Recommand.config(); | ||
|
||
String input = path.get("Step1Input"); | ||
String output = path.get("Step1Output"); | ||
|
||
HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); | ||
hdfs.rmr(input); | ||
hdfs.mkdirs(input); | ||
hdfs.copyFile(path.get("data"), input); | ||
|
||
conf.setMapOutputKeyClass(IntWritable.class); | ||
conf.setMapOutputValueClass(Text.class); | ||
|
||
conf.setOutputKeyClass(IntWritable.class); | ||
conf.setOutputValueClass(Text.class); | ||
|
||
conf.setMapperClass(Step1_ToItemPreMapper.class); | ||
conf.setCombinerClass(Step1_ToUserVectorReducer.class); | ||
conf.setReducerClass(Step1_ToUserVectorReducer.class); | ||
|
||
conf.setInputFormat(TextInputFormat.class); | ||
conf.setOutputFormat(TextOutputFormat.class); | ||
|
||
FileInputFormat.setInputPaths(conf, new Path(input)); | ||
FileOutputFormat.setOutputPath(conf, new Path(output)); | ||
|
||
RunningJob job = JobClient.runJob(conf); | ||
while (!job.isComplete()) { | ||
job.waitForCompletion(); | ||
} | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package org.conan.myhadoop.recommand; | ||
|
||
import java.io.IOException; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
|
||
import org.apache.hadoop.fs.Path; | ||
import org.apache.hadoop.io.IntWritable; | ||
import org.apache.hadoop.io.LongWritable; | ||
import org.apache.hadoop.io.Text; | ||
import org.apache.hadoop.mapred.FileInputFormat; | ||
import org.apache.hadoop.mapred.FileOutputFormat; | ||
import org.apache.hadoop.mapred.JobClient; | ||
import org.apache.hadoop.mapred.JobConf; | ||
import org.apache.hadoop.mapred.MapReduceBase; | ||
import org.apache.hadoop.mapred.Mapper; | ||
import org.apache.hadoop.mapred.OutputCollector; | ||
import org.apache.hadoop.mapred.Reducer; | ||
import org.apache.hadoop.mapred.Reporter; | ||
import org.apache.hadoop.mapred.RunningJob; | ||
import org.apache.hadoop.mapred.TextInputFormat; | ||
import org.apache.hadoop.mapred.TextOutputFormat; | ||
import org.conan.myhadoop.hdfs.HdfsDAO; | ||
|
||
public class Step2 { | ||
public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { | ||
private final static Text k = new Text(); | ||
private final static IntWritable v = new IntWritable(1); | ||
|
||
@Override | ||
public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { | ||
String[] tokens = Recommand.DELIMITER.split(values.toString()); | ||
for (int i = 1; i < tokens.length; i++) { | ||
String itemID = tokens[i].split(":")[0]; | ||
for (int j = 1; j < tokens.length; j++) { | ||
String itemID2 = tokens[j].split(":")[0]; | ||
k.set(itemID + ":" + itemID2); | ||
output.collect(k, v); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { | ||
private IntWritable result = new IntWritable(); | ||
|
||
@Override | ||
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { | ||
int sum = 0; | ||
while (values.hasNext()) { | ||
sum += values.next().get(); | ||
} | ||
result.set(sum); | ||
output.collect(key, result); | ||
} | ||
} | ||
|
||
public static void run(Map<String, String> path) throws IOException { | ||
JobConf conf = Recommand.config(); | ||
|
||
String input = path.get("Step2Input"); | ||
String output = path.get("Step2Output"); | ||
|
||
HdfsDAO hdfs = new HdfsDAO(Recommand.HDFS, conf); | ||
hdfs.rmr(output); | ||
|
||
conf.setOutputKeyClass(Text.class); | ||
conf.setOutputValueClass(IntWritable.class); | ||
|
||
conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class); | ||
conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class); | ||
conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class); | ||
|
||
conf.setInputFormat(TextInputFormat.class); | ||
conf.setOutputFormat(TextOutputFormat.class); | ||
|
||
FileInputFormat.setInputPaths(conf, new Path(input)); | ||
FileOutputFormat.setOutputPath(conf, new Path(output)); | ||
|
||
RunningJob job = JobClient.runJob(conf); | ||
while (!job.isComplete()) { | ||
job.waitForCompletion(); | ||
} | ||
} | ||
} |
Oops, something went wrong.