@@ -36,7 +36,6 @@ val outputRDD = rdd.map(
36
36
println(" Spark RDD processing result: " )
37
37
outputRDD.foreach(println)
38
38
39
-
40
39
/* Similar aggregation as above, but utilising MongoDB aggregation pipeline */
41
40
val aggRdd = rdd.withPipeline(Seq (
42
41
Document .parse(" {$sort:{timestamp:1}}" ),
@@ -57,8 +56,10 @@ println("RDD is written to MongoDB")
57
56
/* DataFrames examples */
58
57
val sqlContext = SQLContext .getOrCreate(sc)
59
58
val df = MongoSpark .load(sqlContext)
59
+
60
60
// Print schema
61
61
df.printSchema()
62
+
62
63
// Filter by Integer and by String
63
64
df.filter(df(" myid" ) < 2 ).show()
64
65
df.filter(df(" doc" ) === " V " ).show()
@@ -67,8 +68,12 @@ df.filter(df("doc") === "V ").show()
67
68
df.registerTempTable(" temporary" )
68
69
val sqlResult = sqlContext.sql(" SELECT myid, doc, timestamp FROM temporary WHERE myid > 6 AND doc='V '" )
69
70
sqlResult.show()
71
+
70
72
// Save out the filtered DataFrame result
71
73
MongoSpark .save(sqlResult.write.option(" collection" , " DF_times" ).mode(" overwrite" ))
74
+ // Alternatively you could also specify uri
75
+ // MongoSpark.save(sqlResult.write.option("uri", "mongodb://mongodb:27017/spark.DF_times").mode("overwrite"))
76
+
72
77
// Read it back in
73
78
MongoSpark .load(sqlContext, ReadConfig (Map (" collection" -> " DF_times" ), Some (ReadConfig (sqlContext)))).show()
74
79
0 commit comments