2222import org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
2323
2424import java .io .IOException ;
25- import java .util .ArrayList ;
26- import java .util .List ;
27- import java .util .Map ;
25+ import java .util .*;
2826
2927
3028// doubts
@@ -112,34 +110,46 @@ public void reduce(Text key, Iterable<Text> values,
112110 // output
113111 // key - "outlier"
114112 // value - point
115- context .write (new Text ("Outlier " ), new Text (point [0 ] + "," + point [1 ]));
113+ context .write (new Text ("" ), new Text (point [0 ] + "," + point [1 ]));
116114 }
117115 }
118116 }
119117 }
120118
121119 public static void main (String [] args ) throws Exception {
122-
120+ if (args .length < 5 ) {
121+ throw new Exception ("Pass all the required arguments. Input file, Output filepath, Output filename, radius, threshold" );
122+ }
123123 Configuration conf = new Configuration ();
124124
125- String inputDataPath = "/Users/badgod/badgod_documents/github/BigDataTutorials/input/project2/xy_coordinates_test2" ;
126- String outputPath = "/Users/badgod/badgod_documents/github/BigDataTutorials/output/project2/outlier_xy_space_test2/" ;
127-
128- conf .set ("xRange" , "20 " );
129- conf .set ("yRange" , "20 " );
130- conf .set ("radius" , "4" );
131- conf .set ("thresholdK" , "4" );
132- conf .set ("divisions" , "50 " );
125+ String inputDataPath = args [ 0 ] ;
126+ String outputPath = args [ 1 ] ;
127+ String outliersFilename = args [ 2 ];
128+ conf .set ("xRange" , "10000 " );
129+ conf .set ("yRange" , "10000 " );
130+ conf .set ("radius" , args [ 3 ] );
131+ conf .set ("thresholdK" , args [ 4 ] );
132+ conf .set ("divisions" , "100 " );
133133 conf .set ("outputPath" , outputPath );
134+ int reducers = 5 ;
134135
135136
137+ // add the below code if you are reading/writing from/to HDFS
138+ String hadoopHome = System .getenv ("HADOOP_HOME" );
139+ if (hadoopHome == null ) {
140+ throw new Exception ("HADOOP_HOME not found. Please make sure system path has HADOOP_HOME point to hadoop installation directory" );
141+ }
142+ conf .addResource (new Path (hadoopHome + "/etc/hadoop/core-site.xml" ));
143+ conf .addResource (new Path (hadoopHome + "/etc/hadoop/hdfs-site.xml" ));
144+
136145 FileSystem fs = FileSystem .get (conf );
137146 fs .delete (new Path (outputPath ), true );
138147 Job job = Job .getInstance (conf , "OutlierInXYSpace" );
139148
140149 job .setJarByClass (OutlierInXYSpace .class );
141150 job .setMapperClass (OutlierInXYSpace .CustomMapper .class );
142151 job .setReducerClass (OutlierInXYSpace .CustomReducer .class );
152+ job .setNumReduceTasks (reducers );
143153
144154 job .setOutputKeyClass (Text .class );
145155 job .setOutputValueClass (Text .class );
@@ -149,14 +159,16 @@ public static void main(String[] args) throws Exception {
149159
150160 job .waitForCompletion (true );
151161
152- int reducers = job .getNumReduceTasks ();
153162
154- // removing duplicates which help in counting the number of points around a given point
155- // but they are not required to be in output file
163+ // 1. removing duplicates points which played a crucial role in counting the number of points around a given point
164+ // but they are not required to be in output file
165+ // 2. Collecting all the reduver outputs and appending it to single file, ofcourse by removing duplicates
166+ Set <String > outliers = new HashSet <>();
156167 for (int i = 0 ; i < reducers ; i ++) {
157- GeneralUtilities .removeDuplicates (outputPath + "/part-r-0000" + i , FileSystem . get ( conf ));
168+ outliers . addAll ( GeneralUtilities .readFileIntoIterableHDFS (outputPath + "/part-r-0000" + i , fs ));
158169 }
159-
160-
170+ GeneralUtilities .writeIterableToFileHDFS (outliers , outputPath + "/" + outliersFilename , fs );
171+ if (fs != null )
172+ fs .close ();
161173 }
162174}
0 commit comments