Skip to content

Commit 1b37b08

Browse files
rsotn-maprmgorbov
authored andcommitted
MapR [SPARK-21] Structured Streaming MapR-DB Sink created (apache#219)
1 parent 2499aba commit 1b37b08

File tree

3 files changed

+67
-0
lines changed

3 files changed

+67
-0
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.mapr.db.spark.streaming
2+
3+
import com.mapr.db.spark.streaming.sink.MapRDBSink
4+
import org.apache.spark.internal.Logging
5+
import org.apache.spark.sql.SQLContext
6+
import org.apache.spark.sql.execution.streaming.Sink
7+
import org.apache.spark.sql.sources._
8+
import org.apache.spark.sql.streaming.OutputMode
9+
10+
class DefaultSource extends StreamSinkProvider with DataSourceRegister with Logging {
11+
12+
override def shortName(): String = "maprdb"
13+
14+
override def createSink(sqlContext: SQLContext,
15+
parameters: Map[String, String],
16+
partitionColumns: Seq[String],
17+
outputMode: OutputMode): Sink = {
18+
new MapRDBSink(parameters)
19+
}
20+
21+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.mapr.db.spark.streaming
2+
3+
object MapRDBSourceConfig {
4+
5+
val Format: String = classOf[DefaultSource].getPackage.getName
6+
val TablePathOption: String = "tablePath"
7+
val IdFieldPathOption: String = "idFieldPath"
8+
val CreateTableOption: String = "createTable"
9+
val BulkModeOption: String = "bulkMode"
10+
val SampleSizeOption: String = "sampleSize"
11+
12+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.mapr.db.spark.streaming.sink
2+
3+
4+
import com.mapr.db.spark.sql._
5+
import com.mapr.db.spark.streaming.MapRDBSourceConfig
6+
import org.apache.spark.internal.Logging
7+
import org.apache.spark.sql.DataFrame
8+
import org.apache.spark.sql.execution.streaming.Sink
9+
import org.ojai.DocumentConstants
10+
11+
private[streaming] class MapRDBSink(parameters: Map[String, String]) extends Sink with Logging {
12+
13+
@volatile private var latestBatchId = -1L
14+
15+
override def toString(): String = "MapRDBSink"
16+
17+
override def addBatch(batchId: Long, data: DataFrame): Unit = {
18+
if (batchId <= latestBatchId) {
19+
logInfo(s"Skipping already committed batch $batchId")
20+
} else {
21+
22+
val tablePath = parameters.get(MapRDBSourceConfig.TablePathOption)
23+
require(tablePath.isDefined, s"'${MapRDBSourceConfig.TablePathOption}' option must be defined")
24+
25+
val idFieldPath = parameters.getOrElse(MapRDBSourceConfig.IdFieldPathOption, DocumentConstants.ID_KEY)
26+
val createTable = parameters.getOrElse(MapRDBSourceConfig.CreateTableOption, "false").toBoolean
27+
val bulkInsert = parameters.getOrElse(MapRDBSourceConfig.BulkModeOption, "false").toBoolean
28+
29+
data.saveToMapRDB(tablePath.get, idFieldPath, createTable, bulkInsert)
30+
31+
latestBatchId = batchId
32+
}
33+
}
34+
}

0 commit comments

Comments
 (0)