2
2
import java .io .IOException ;
3
3
import java .util .Arrays ;
4
4
5
+ import org .apache .commons .lang .StringUtils ;
5
6
import org .apache .hadoop .conf .Configuration ;
6
7
import org .apache .hadoop .conf .Configured ;
7
8
import org .apache .hadoop .fs .FileSystem ;
8
9
import org .apache .hadoop .fs .Path ;
9
- import org .apache .hadoop .io .IntWritable ;
10
10
import org .apache .hadoop .io .LongWritable ;
11
11
import org .apache .hadoop .io .Text ;
12
12
import org .apache .hadoop .mapreduce .Job ;
13
13
import org .apache .hadoop .mapreduce .Mapper ;
14
+ import org .apache .hadoop .mapreduce .Reducer ;
14
15
import org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
15
16
import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
16
- import org .apache .hadoop .mapreduce .lib .reduce .IntSumReducer ;
17
17
import org .apache .hadoop .util .Tool ;
18
18
import org .apache .hadoop .util .ToolRunner ;
19
19
20
20
public class SurfaceFormDictionaryWordCount extends Configured implements Tool {
21
- private static class HadoopSurfaceFormDictionaryMap extends Mapper <LongWritable , Text , Text , IntWritable > {
21
+ private static class HadoopSurfaceFormDictionaryMap extends Mapper <LongWritable , Text , Text , Text > {
22
22
@ Override
23
23
public void map (LongWritable key , Text value , Context context )
24
24
throws IOException , InterruptedException {
25
25
String word = value .toString ().split ("@@" )[0 ];
26
+ String target = value .toString ().split ("@@" )[1 ];
26
27
String count = value .toString ().split ("\t " )[1 ];
27
28
28
- context .write (new Text (word ), new IntWritable (Integer .parseInt (count )));
29
+ context .write (new Text (word ), new Text (target + ":" + count ));
30
+ }
31
+ }
32
+
33
+ private static class WikiSenseDictionaryReduce extends Reducer <Text , Text , Text , Text > {
34
+ @ Override
35
+ public void reduce (Text word , Iterable <Text > targetCounts , Context context )
36
+ throws IOException , InterruptedException {
37
+ int numSenses = 0 ;
38
+ int totalCount = 0 ;
39
+ for (Text targetCount : targetCounts ) {
40
+ int count = Integer .parseInt (targetCount .toString ().split (":" )[1 ]);
41
+ totalCount += count ;
42
+ numSenses ++;
43
+ }
44
+
45
+ context .write (word , new Text (totalCount + "\t " + numSenses + "\t " + StringUtils .join (targetCounts .iterator (), " " )));
29
46
}
30
47
}
31
48
@@ -45,11 +62,11 @@ public boolean runJob(String inDir, String outDir) throws Exception {
45
62
FileInputFormat .addInputPath (job , new Path (inDir ));
46
63
FileOutputFormat .setOutputPath (job , new Path (_outDir ));
47
64
job .setMapperClass (HadoopSurfaceFormDictionaryMap .class );
48
- job .setReducerClass (IntSumReducer .class );
65
+ job .setReducerClass (WikiSenseDictionaryReduce .class );
49
66
job .setMapOutputKeyClass (Text .class );
50
- job .setMapOutputValueClass (IntWritable .class );
67
+ job .setMapOutputValueClass (Text .class );
51
68
job .setOutputKeyClass (Text .class );
52
- job .setOutputValueClass (IntWritable .class );
69
+ job .setOutputValueClass (Text .class );
53
70
job .setJobName ("WikiLinkProcessor:SurfaceFormDictionaryWordCount" );
54
71
return job .waitForCompletion (true );
55
72
}
0 commit comments