11package com .mapr .db .spark .streaming .sink
22
33
4- import com .mapr .db .spark .sql . _
4+ import com .mapr .db .spark ._
55import com .mapr .db .spark .streaming .MapRDBSourceConfig
66import org .apache .spark .internal .Logging
77import org .apache .spark .sql .DataFrame
8+ import org .apache .spark .sql .catalyst .encoders .RowEncoder
9+ import org .apache .spark .sql .catalyst .plans .logical .{Command , LocalRelation , LogicalPlan , Union }
810import org .apache .spark .sql .execution .streaming .Sink
911import org .ojai .DocumentConstants
1012
@@ -26,9 +28,26 @@ private[streaming] class MapRDBSink(parameters: Map[String, String]) extends Sin
2628 val createTable = parameters.getOrElse(MapRDBSourceConfig .CreateTableOption , " false" ).toBoolean
2729 val bulkInsert = parameters.getOrElse(MapRDBSourceConfig .BulkModeOption , " false" ).toBoolean
2830
29- data.saveToMapRDB(tablePath.get, idFieldPath, createTable, bulkInsert)
31+ val logicalPlan : LogicalPlan = {
32+ // For various commands (like DDL) and queries with side effects, we force query execution
33+ // to happen right away to let these side effects take place eagerly.
34+ data.queryExecution.analyzed match {
35+ case c : Command =>
36+ LocalRelation (c.output, data.queryExecution.executedPlan.executeCollect())
37+ case u@ Union (children) if children.forall(_.isInstanceOf [Command ]) =>
38+ LocalRelation (u.output, data.queryExecution.executedPlan.executeCollect())
39+ case _ =>
40+ data.queryExecution.analyzed
41+ }
42+ }
43+
44+ val encoder = RowEncoder (data.schema).resolveAndBind(
45+ logicalPlan.output,
46+ data.sparkSession.sessionState.analyzer)
47+ data.queryExecution.toRdd.map(encoder.fromRow).saveToMapRDB(tablePath.get, createTable, bulkInsert, idFieldPath)
3048
3149 latestBatchId = batchId
3250 }
3351 }
52+
3453}
0 commit comments