Skip to content

Commit 5c9f900

Browse files
committed
Add DataFrame filter/SQL examples. Rename reduceByKey.scala file to examples.scala
1 parent f0e9c3e commit 5c9f900

File tree

4 files changed

+35
-14
lines changed

4 files changed

+35
-14
lines changed

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@ ${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:
2323
```
2424

2525

26-
For example, please see [reduceByKey.scala](spark/files/reduceByKey.scala) to query from mongodb, run a simple aggregation, and finally write output back to mongodb. This file will also be available inside of the spark container in `/home/ubuntu/reduceByKey.scala`
26+
For example, please see [examples.scala](spark/files/examples.scala) to query from mongodb, run a simple aggregation, dataframe SQL and finally write output back to mongodb. This file will also be available inside of the spark container in `/home/ubuntu/examples.scala`
2727

2828
Run the `spark shell` by executing:
2929

3030
```sh
3131
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION}
3232
```
3333

34-
You can also append `-i <file.scala>` to execute a scala file via the spark shell.
34+
You can also append `-i <file.scala>` to execute a scala file via the spark shell. For example:
3535

36+
```sh
37+
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION} -i ./examples.scala
38+
```
3639

3740
### More Information.
3841

spark/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ ENV SCALA_VERSION 2.10
1717

1818
WORKDIR ${HOME}
1919

20-
ENV ${HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
20+
ENV SPARK_HOME ${HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
2121

2222
COPY files/times.json /home/ubuntu/times.json
2323
COPY files/readme.txt /home/ubuntu/readme.txt
24-
COPY files/reduceByKey.scala /home/ubuntu/reduceByKey.scala
24+
COPY files/examples.scala /home/ubuntu/examples.scala
2525
COPY files/initDocuments.scala /home/ubuntu/initDocuments.scala
2626

2727
RUN chown -R ubuntu:ubuntu /home/ubuntu/*

spark/files/reduceByKey.scala renamed to spark/files/examples.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11

22
import com.mongodb.spark._
3+
import com.mongodb.spark.config._
4+
import org.apache.spark.sql.SQLContext
5+
36
import org.bson.Document
47

58
/* Load collection as RDD */
@@ -44,15 +47,31 @@ val aggRdd = rdd.withPipeline(Seq(
4447
println("MongoDB aggregation pipeline reult: ")
4548
aggRdd.foreach(println)
4649

47-
4850
// Save result to MongoDB
4951
// 1) Default
50-
import com.mongodb.spark.config._
5152
aggRdd.saveToMongoDB()
5253
// 2) Using helper and WriteConfig to modify destination
5354
outputRDD.saveToMongoDB(WriteConfig(Map("uri"->"mongodb://mongodb:27017/spark.processing")))
5455
println("RDD is written to MongoDB")
5556

57+
/* DataFrames examples */
58+
val sqlContext = SQLContext.getOrCreate(sc)
59+
val df = MongoSpark.load(sqlContext)
60+
// Print schema
61+
df.printSchema()
62+
// Filter by Integer and by String
63+
df.filter(df("myid") < 2).show()
64+
df.filter(df("doc") === "V ").show()
65+
66+
// DataFrames SQL example
67+
df.registerTempTable("temporary")
68+
val sqlResult = sqlContext.sql("SELECT myid, doc, timestamp FROM temporary WHERE myid > 6 AND doc='V '")
69+
sqlResult.show()
70+
// Save out the filtered DataFrame result
71+
MongoSpark.save(sqlResult.write.option("collection", "DF_times").mode("overwrite"))
72+
// Read it back in
73+
MongoSpark.load(sqlContext, ReadConfig(Map("collection" -> "DF_times"), Some(ReadConfig(sqlContext)))).show()
74+
5675
println("Done")
5776
System.exit(0);
5877

spark/files/readme.txt

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11

2-
# set data : You can run mongoimport on host to import into 'mongodb' docker instance
3-
# to find out the IP on OSX docker-machine, you can use `docker-machine ip default`
4-
mongoimport -h <mongodb ip> -d spark -c times ./times.json
5-
6-
# Or you can just use initDocuments.scala to import using Spark itself
2+
# set data : You can run use initDocuments.scala to import using Spark itself.
73
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.0.0 -i ./initDocuments.scala
84

95
# Run spark-shell
10-
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb:27107/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION}
6+
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION}
7+
8+
# Or you can run scala file through the shell by specifying `-i <file.scala>`. For example to run `examples.scala` example:
9+
${SPARK_HOME}/bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://mongodb:27017/spark.times" --conf "spark.mongodb.output.uri=mongodb://mongodb/spark.output" --packages org.mongodb.spark:mongo-spark-connector_${SCALA_VERSION}:${MONGO_SPARK_VERSION} -i ./examples.scala
1110

12-
# Or you can run scala file through the shell by specifying `-i <file.scala>`
1311

1412
# start 1 master/worker
1513
${SPARK_HOME}/sbin/start-master.sh
16-
${SPARK_HOME}/sbin/start-slave.sh spark://spark:7077
14+
${SPARK_HOME}/sbin/start-slave.sh spark://spark:7077
15+

0 commit comments

Comments
 (0)