-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDataCleaning.java
59 lines (54 loc) · 2.02 KB
/
DataCleaning.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package mongoDump;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
/**
* @author Sudev Ambadi
*
*/
public class DataCleaning {
public static JavaPairRDD<String, Map<String, Map<String, String>>> dataclean(
JavaSparkContext sc, String filepath, final Set<String> filterTag,
final int pos_tag, final int pos_cname,
final int pos_date, final int pos_value) {
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().format("com.databricks.spark.csv")
.option("header", "true").load(filepath);
JavaRDD<Row> rowrdd = df.javaRDD();
JavaPairRDD<String, Map<String, Map<String, String>>> mapedrdd = rowrdd
.filter(new Function<Row, Boolean>() {
@Override
public Boolean call(Row r) throws Exception {
return filterTag.contains(r.getString(pos_tag));
}
})
.mapToPair(
new PairFunction<Row, String, Map<String, Map<String, String>>>() {
@Override
public Tuple2<String, Map<String, Map<String, String>>> call(
Row r) {
Map<String, String> m1 = new HashMap<String, String>();
Map<String, Map<String, String>> m2 = new HashMap<String, Map<String, String>>();
String label = r.getString(pos_tag);
// create a map of the form { Tag : value }
m1.put(label, r.getString(pos_value));
String year = r.getString(pos_date).substring(
r.getString(pos_date).length() - 4);
// create a map of the form
// { year : { tag : value } }
m2.put(year, m1);
return new Tuple2<String, Map<String, Map<String, String>>>(r.getString(pos_cname), m2);
}
});
return mapedrdd;
}
}