Skip to content

Commit 1299715

Browse files
committed
translate streaming output mode to write operators
1 parent 927081d commit 1299715

File tree

10 files changed

+75
-97
lines changed

10 files changed

+75
-97
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._
3434
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
3535
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
3636
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
37-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
37+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
3838
import org.apache.spark.sql.streaming.OutputMode
3939
import org.apache.spark.sql.types.StructType
4040

@@ -362,16 +362,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
362362
}
363363

364364
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
365-
new WriteBuilder with SupportsOutputMode {
365+
new WriteBuilder {
366366
private var inputSchema: StructType = _
367367

368368
override def withInputDataSchema(schema: StructType): WriteBuilder = {
369369
this.inputSchema = schema
370370
this
371371
}
372372

373-
override def outputMode(mode: OutputMode): WriteBuilder = this
374-
375373
override def buildForStreaming(): StreamingWrite = {
376374
import scala.collection.JavaConverters._
377375

sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/SupportsOutputMode.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.sources.DataSourceRegister
2323
import org.apache.spark.sql.sources.v2._
2424
import org.apache.spark.sql.sources.v2.writer._
25-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
26-
import org.apache.spark.sql.streaming.OutputMode
25+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
2726
import org.apache.spark.sql.types.StructType
2827

2928
/**
@@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor
4241
}
4342

4443
private[noop] object NoopWriteBuilder extends WriteBuilder
45-
with SupportsSaveMode with SupportsOutputMode {
44+
with SupportsSaveMode with SupportsTruncate {
4645
override def mode(mode: SaveMode): WriteBuilder = this
47-
override def outputMode(mode: OutputMode): WriteBuilder = this
46+
override def truncate(): WriteBuilder = this
4847
override def buildForBatch(): BatchWrite = NoopBatchWrite
4948
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
5049
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
3131
import org.apache.spark.sql.internal.SQLConf
3232
import org.apache.spark.sql.sources.v2._
3333
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
34-
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
3534
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
3635
import org.apache.spark.util.Clock
3736

@@ -515,14 +514,7 @@ class MicroBatchExecution(
515514
val triggerLogicalPlan = sink match {
516515
case _: Sink => newAttributePlan
517516
case s: SupportsStreamingWrite =>
518-
// TODO: we should translate OutputMode to concrete write actions like truncate, but
519-
// the truncate action is being developed in SPARK-26666.
520-
val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
521-
.withQueryId(runId.toString)
522-
.withInputDataSchema(newAttributePlan.schema)
523-
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
524-
.outputMode(outputMode)
525-
.buildForStreaming()
517+
val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
526518
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
527519
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
528520
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
2424
import java.util.concurrent.atomic.AtomicReference
2525
import java.util.concurrent.locks.{Condition, ReentrantLock}
2626

27+
import scala.collection.JavaConverters._
2728
import scala.collection.mutable.{Map => MutableMap}
2829
import scala.util.control.NonFatal
2930

@@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException}
3435
import org.apache.spark.internal.Logging
3536
import org.apache.spark.sql._
3637
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
38+
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
3739
import org.apache.spark.sql.execution.QueryExecution
3840
import org.apache.spark.sql.execution.command.StreamingExplainCommand
3941
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
4042
import org.apache.spark.sql.internal.SQLConf
43+
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
44+
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
45+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
4146
import org.apache.spark.sql.streaming._
4247
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
4348

@@ -574,6 +579,35 @@ abstract class StreamExecution(
574579
Option(name).map(_ + "<br/>").getOrElse("") +
575580
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
576581
}
582+
583+
protected def createStreamingWrite(
584+
table: SupportsStreamingWrite,
585+
options: Map[String, String],
586+
inputPlan: LogicalPlan): StreamingWrite = {
587+
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
588+
.withQueryId(runId.toString)
589+
.withInputDataSchema(inputPlan.schema)
590+
outputMode match {
591+
case Append =>
592+
writeBuilder.buildForStreaming()
593+
594+
case Complete =>
595+
// TODO: we should do this check earlier when we have capability API.
596+
require(writeBuilder.isInstanceOf[SupportsTruncate],
597+
table.name + " does not support Complete mode.")
598+
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()
599+
600+
case Update =>
601+
// Although no v2 sinks really support Update mode now, but during tests we do want them
602+
// to pretend to support Update mode, and treat Update mode same as Append mode.
603+
if (Utils.isTesting) {
604+
writeBuilder.buildForStreaming()
605+
} else {
606+
throw new IllegalArgumentException(
607+
"Data source v2 streaming sinks does not support Update mode.")
608+
}
609+
}
610+
}
577611
}
578612

579613
object StreamExecution {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import org.apache.spark.sql._
2121
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
2222
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
2323
import org.apache.spark.sql.sources.v2._
24-
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
25-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
26-
import org.apache.spark.sql.streaming.OutputMode
24+
import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
25+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
2726
import org.apache.spark.sql.types.StructType
2827

2928
case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
@@ -64,15 +63,16 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
6463
override def schema(): StructType = StructType(Nil)
6564

6665
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
67-
new WriteBuilder with SupportsOutputMode {
66+
new WriteBuilder with SupportsTruncate {
6867
private var inputSchema: StructType = _
6968

7069
override def withInputDataSchema(schema: StructType): WriteBuilder = {
7170
this.inputSchema = schema
7271
this
7372
}
7473

75-
override def outputMode(mode: OutputMode): WriteBuilder = this
74+
// Do nothing for truncate. Console sink is special that it just prints all the records.
75+
override def truncate(): WriteBuilder = this
7676

7777
override def buildForStreaming(): StreamingWrite = {
7878
assert(inputSchema != null)

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
3434
import org.apache.spark.sql.sources.v2
3535
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
3636
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
37-
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
3837
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
3938
import org.apache.spark.util.Clock
4039

@@ -175,14 +174,7 @@ class ContinuousExecution(
175174
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
176175
}
177176

178-
// TODO: we should translate OutputMode to concrete write actions like truncate, but
179-
// the truncate action is being developed in SPARK-26666.
180-
val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
181-
.withQueryId(runId.toString)
182-
.withInputDataSchema(withNewSources.schema)
183-
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
184-
.outputMode(outputMode)
185-
.buildForStreaming()
177+
val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
186178
val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)
187179

188180
reportTimeTaken("queryPlanning") {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
2323
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
2424
import org.apache.spark.sql.execution.python.PythonForeachWriter
2525
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
26-
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
27-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
28-
import org.apache.spark.sql.streaming.OutputMode
26+
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
27+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
2928
import org.apache.spark.sql.types.StructType
3029

3130
/**
@@ -46,15 +45,17 @@ case class ForeachWriterTable[T](
4645
override def schema(): StructType = StructType(Nil)
4746

4847
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
49-
new WriteBuilder with SupportsOutputMode {
48+
new WriteBuilder with SupportsTruncate {
5049
private var inputSchema: StructType = _
5150

5251
override def withInputDataSchema(schema: StructType): WriteBuilder = {
5352
this.inputSchema = schema
5453
this
5554
}
5655

57-
override def outputMode(mode: OutputMode): WriteBuilder = this
56+
// Do nothing for truncate. Foreach sink is special that it just forwards all the records to
57+
// ForeachWriter.
58+
override def truncate(): WriteBuilder = this
5859

5960
override def buildForStreaming(): StreamingWrite = {
6061
new StreamingWrite {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
3030
import org.apache.spark.sql.catalyst.expressions.Attribute
3131
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
3232
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
33-
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
3433
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
3534
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
3635
import org.apache.spark.sql.sources.v2.writer._
37-
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
38-
import org.apache.spark.sql.streaming.OutputMode
36+
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
3937
import org.apache.spark.sql.types.StructType
4038

4139
/**
@@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
4947
override def schema(): StructType = StructType(Nil)
5048

5149
override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
52-
new WriteBuilder with SupportsOutputMode {
53-
private var mode: OutputMode = _
50+
new WriteBuilder with SupportsTruncate {
51+
private var needTruncate: Boolean = false
5452
private var inputSchema: StructType = _
5553

56-
override def outputMode(mode: OutputMode): WriteBuilder = {
57-
this.mode = mode
54+
override def truncate(): WriteBuilder = {
55+
this.needTruncate = true
5856
this
5957
}
6058

@@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
6462
}
6563

6664
override def buildForStreaming(): StreamingWrite = {
67-
new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
65+
new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate)
6866
}
6967
}
7068
}
@@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
10199
}.mkString("\n")
102100
}
103101

104-
def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
102+
def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = {
105103
val notCommitted = synchronized {
106104
latestBatchId.isEmpty || batchId > latestBatchId.get
107105
}
108106
if (notCommitted) {
109107
logDebug(s"Committing batch $batchId to $this")
110-
outputMode match {
111-
case Append | Update =>
112-
val rows = AddedData(batchId, newRows)
113-
synchronized { batches += rows }
114-
115-
case Complete =>
116-
val rows = AddedData(batchId, newRows)
117-
synchronized {
118-
batches.clear()
119-
batches += rows
120-
}
121-
122-
case _ =>
123-
throw new IllegalArgumentException(
124-
s"Output mode $outputMode is not supported by MemorySinkV2")
108+
val rows = AddedData(batchId, newRows)
109+
if (needTruncate) {
110+
synchronized {
111+
batches.clear()
112+
batches += rows
113+
}
114+
} else {
115+
synchronized { batches += rows }
125116
}
126117
} else {
127118
logDebug(s"Skipping already committed batch: $batchId")
@@ -139,32 +130,32 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
139130
extends WriterCommitMessage {}
140131

141132
class MemoryStreamingWrite(
142-
val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
133+
val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean)
143134
extends StreamingWrite {
144135

145136
override def createStreamingWriterFactory: MemoryWriterFactory = {
146-
MemoryWriterFactory(outputMode, schema)
137+
MemoryWriterFactory(schema)
147138
}
148139

149140
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
150141
val newRows = messages.flatMap {
151142
case message: MemoryWriterCommitMessage => message.data
152143
}
153-
sink.write(epochId, outputMode, newRows)
144+
sink.write(epochId, needTruncate, newRows)
154145
}
155146

156147
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
157148
// Don't accept any of the new input.
158149
}
159150
}
160151

161-
case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
152+
case class MemoryWriterFactory(schema: StructType)
162153
extends DataWriterFactory with StreamingDataWriterFactory {
163154

164155
override def createWriter(
165156
partitionId: Int,
166157
taskId: Long): DataWriter[InternalRow] = {
167-
new MemoryDataWriter(partitionId, outputMode, schema)
158+
new MemoryDataWriter(partitionId, schema)
168159
}
169160

170161
override def createWriter(
@@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
175166
}
176167
}
177168

178-
class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)
169+
class MemoryDataWriter(partition: Int, schema: StructType)
179170
extends DataWriter[InternalRow] with Logging {
180171

181172
private val data = mutable.Buffer[Row]()

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
2929
test("data writer") {
3030
val partition = 1234
3131
val writer = new MemoryDataWriter(
32-
partition, OutputMode.Append(), new StructType().add("i", "int"))
32+
partition, new StructType().add("i", "int"))
3333
writer.write(InternalRow(1))
3434
writer.write(InternalRow(2))
3535
writer.write(InternalRow(44))
@@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
4444
test("streaming writer") {
4545
val sink = new MemorySinkV2
4646
val write = new MemoryStreamingWrite(
47-
sink, OutputMode.Append(), new StructType().add("i", "int"))
47+
sink, new StructType().add("i", "int"), needTruncate = false)
4848
write.commit(0,
4949
Array(
5050
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),

0 commit comments

Comments
 (0)