Skip to content

Commit fddc84f

Browse files
committed
MapR [SPARK-325] Add examples for work with the MapRDB JSON connector into the Spark project (apache#361)
1 parent 23cdf46 commit fddc84f

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

examples/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@
126126
<scope>provided</scope>
127127
<version>${project.version}</version>
128128
</dependency>
129+
<dependency>
130+
<groupId>com.mapr.db</groupId>
131+
<artifactId>maprdb-spark</artifactId>
132+
<scope>provided</scope>
133+
<version>${project.version}</version>
134+
</dependency>
129135
</dependencies>
130136

131137
<build>
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1 hello world
2+
2 this is the text for the wordcount example of the maprdb connector
3+
3 first word of each line will be saved as id column by the maprdb ojai connector
4+
4 word1 word2 word1 word2
5+
5 word1 word3 word3 word3
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// scalastyle:off println
19+
package org.apache.spark.examples.maprdbconnector
20+
21+
import org.apache.spark.sql.SparkSession
22+
23+
import com.mapr.db.spark.sql._
24+
25+
object MaprDBJsonConnectorWordCount {
26+
27+
def main(args: Array[String]): Unit = {
28+
29+
parseArgs(args)
30+
31+
val pathToFileWithData = args(0)
32+
val tableName = args(1)
33+
val tableNameWithResult = args(2)
34+
35+
val spark = SparkSession
36+
.builder()
37+
.appName("OJAI MaprDB connector wordcount example")
38+
.getOrCreate()
39+
40+
import spark.implicits._
41+
val wordSequenceDS = importDataIntoSeq(pathToFileWithData).toDS()
42+
43+
wordSequenceDS.saveToMapRDB(tableName, createTable = true)
44+
45+
val dfWithDataFromMaprDB = spark.loadFromMapRDB(tableName)
46+
.flatMap(line => line.getAs[String](1).split(" "))
47+
.groupBy("value")
48+
.count()
49+
50+
println("Dataset with counted words:")
51+
dfWithDataFromMaprDB.show()
52+
53+
dfWithDataFromMaprDB.withColumn("_id", $"value")
54+
.saveToMapRDB(tableNameWithResult, createTable = true)
55+
println("Dataset with counted words was saved into the MaprDB table.")
56+
57+
spark.stop()
58+
}
59+
60+
private def parseArgs(args: Array[String]): Unit = {
61+
if (args.length != 3) {
62+
printUsage()
63+
System.exit(1)
64+
}
65+
}
66+
67+
private def printUsage(): Unit = {
68+
val usage =
69+
"""OJAI MaprDB connector wordcount example
70+
|Usage:
71+
|1) path to the file with data (words.txt can be used for the test);
72+
|2) name of the MaprDB table where data from file will be saved;
73+
|3) name of the MaprDB table where result will be saved;
74+
|""".stripMargin
75+
76+
println(usage)
77+
}
78+
79+
private def importDataIntoSeq(filePath: String): Seq[Word] = {
80+
scala.io.Source.fromURL(filePath)
81+
.getLines
82+
.map(line => {
83+
val wordWithId = line.split(" ")
84+
Word(wordWithId(0), wordWithId.drop(1).mkString(" "))
85+
}).toSeq
86+
}
87+
88+
private case class Word(_id: String, words: String)
89+
90+
}

0 commit comments

Comments
 (0)