Skip to content

Commit f4cf285

Browse files
committed
tweaked spark files
1 parent 73c5c8e commit f4cf285

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

spark/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ RUN wget -O mongo-hadoop-core-${MONGO_HADOOP_VERSION}.jar http://search.maven.or
3030
# get mongo java driver
3131
RUN wget http://central.maven.org/maven2/org/mongodb/mongo-java-driver/3.2.2/mongo-java-driver-${MONGO_JAVA_VERSION}.jar
3232

33+
# Run single node of spark
3334
RUN ${HOME}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}/sbin/start-master.sh
3435

3536
COPY files/times.json /home/ubuntu/times.json

spark/files/reduceByKey.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ mongoConfig.set("mongo.input.query", "{'myid':{$in:[1,2,3,4,5]}}")
2626
mongoConfig.set("mongo.input.sort", "{timestamp:-1}")
2727

2828
val documents = sc.newAPIHadoopRDD(mongoConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject])
29+
// print out inputs
30+
documents.foreach(println)
2931

3032
val outputRDD = documents.map(
3133
(tuple)=>((tuple._2.get("myid")), (tuple._2.get("timestamp")))
@@ -37,13 +39,12 @@ val outputRDD = documents.map(
3739
}
3840
)
3941

42+
// print out output
4043
outputRDD.foreach(println)
4144

45+
// save result to MongoDB
4246
val outputConfig = new Configuration()
4347
outputConfig.set("mongo.output.uri", "mongodb://mongodb:27017/spark.output")
44-
45-
// write out result
4648
outputRDD.saveAsNewAPIHadoopFile("file:///x", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], outputConfig)
4749

48-
4950
System.exit(0);

0 commit comments

Comments
 (0)