Skip to content

Commit 57e3c51

Browse files
authored
Feature/scala code/ch07 deepak (#17)
* ch07-DataframeRedisWriter * ch07:DatasourceMongodbWriter * ch07:DatasourceMongodbReader * ch07-DatasourceTextfileWriter * ch07-DatasourceTextfileReader
1 parent ebaa218 commit 57e3c51

File tree

2 files changed

+107
-0
lines changed

2 files changed

+107
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.data.algorithms.spark.ch07
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
import scala.io.Source
6+
7+
object DatasourceTextfileReader {
8+
9+
def debugFile(inputPath: String) = {
10+
val bufferedSource = Source.fromFile(inputPath)
11+
for (line <- bufferedSource)
12+
println(line)
13+
bufferedSource.close()
14+
}
15+
16+
def main(args: Array[String]): Unit = {
17+
if (args.length != 1) {
18+
System.err.println("Usage: DatasourceTextfileReader <output-path>")
19+
System.exit(-1)
20+
}
21+
//create an instance of SparkSession
22+
val spark =
23+
SparkSession.
24+
builder().
25+
master("local[*]").
26+
getOrCreate()
27+
28+
// read name of input file
29+
val inputPath = args(0)
30+
println(s"inputPath: ${inputPath}")
31+
debugFile(inputPath)
32+
/*
33+
================================================
34+
# Create an RDD[String] from a given Text File
35+
================================================
36+
*/
37+
val records = spark.sparkContext.textFile(inputPath)
38+
println(s"records = ${records}")
39+
println(s"records.count() = ${records.count()}")
40+
println(s"records.collect() = ${records.collect().mkString("[",",","]")}")
41+
/*
42+
#================================================
43+
# Transform an RDD[String] to RDD[Integer]
44+
#================================================
45+
*/
46+
val numbers = records.flatMap(rec => rec.split(",")).map(_.toInt)
47+
println(s"numbers = ${numbers}")
48+
println(s"numbers.count() = ${numbers.count()}")
49+
println(s"numbers.collect() = ${numbers.collect().mkString("[",",","]")}")
50+
//Done.
51+
spark.stop()
52+
}
53+
54+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.data.algorithms.spark.ch07
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object DatasourceTextfileWriter {
6+
def main(args: Array[String]): Unit = {
7+
if (args.length != 1) {
8+
System.err.println("Usage: DatasourceTextfileWriter <output-path>")
9+
System.exit(-1)
10+
}
11+
//create an instance of SparkSession
12+
val spark =
13+
SparkSession.
14+
builder().
15+
master("local[*]").
16+
getOrCreate()
17+
18+
// read name of input file
19+
val outputPath = args(0)
20+
println(s"outputPath: ${outputPath}")
21+
/*
22+
================================================
23+
# Create an RDD[String]
24+
================================================
25+
*/
26+
val data = List("data element 1", "data element 2", "data element 3", "data element 4")
27+
println(s"data = ${data}")
28+
val records = spark.sparkContext.parallelize(data)
29+
println(s"records = ${records}")
30+
println(s"records.count() = ${records.count()}")
31+
println(s"records.collect() = ${records.collect().mkString("[",",","]")}")
32+
/*
33+
#================================================
34+
# Save an RDD[String] to an output path
35+
#================================================
36+
*/
37+
records.saveAsTextFile(outputPath)
38+
/*
39+
#================================================
40+
# read back from an output path and create and RDD[String]
41+
#================================================
42+
#
43+
*/
44+
val loadedRecords = spark.sparkContext.textFile(outputPath)
45+
println(s"loaded_records = ${loadedRecords}")
46+
println(s"loaded_records.count() = ${loadedRecords.count()}")
47+
println(s"loaded_records.collect() = ${loadedRecords.collect().mkString("[",",","]")}")
48+
49+
//Done.
50+
spark.stop()
51+
}
52+
53+
}

0 commit comments

Comments
 (0)