Skip to content

[SPARK-26956][SS] remove streaming output mode from data source v2 APIs #23859

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -362,16 +362,14 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
import scala.collection.JavaConverters._

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -42,9 +41,9 @@ private[noop] object NoopTable extends Table with SupportsBatchWrite with Suppor
}

private[noop] object NoopWriteBuilder extends WriteBuilder
with SupportsSaveMode with SupportsOutputMode {
with SupportsSaveMode with SupportsTruncate {
override def mode(mode: SaveMode): WriteBuilder = this
override def outputMode(mode: OutputMode): WriteBuilder = this
override def truncate(): WriteBuilder = this
override def buildForBatch(): BatchWrite = NoopBatchWrite
override def buildForStreaming(): StreamingWrite = NoopStreamingWrite
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateCo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock

Expand Down Expand Up @@ -515,14 +514,7 @@ class MicroBatchExecution(
val triggerLogicalPlan = sink match {
case _: Sink => newAttributePlan
case s: SupportsStreamingWrite =>
// TODO: we should translate OutputMode to concrete write actions like truncate, but
// the truncate action is being developed in SPARK-26666.
val writeBuilder = s.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(newAttributePlan.schema)
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
.outputMode(outputMode)
.buildForStreaming()
val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan)
WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan)
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.{Condition, ReentrantLock}

import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import scala.util.control.NonFatal

Expand All @@ -34,10 +35,14 @@ import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

Expand Down Expand Up @@ -574,6 +579,35 @@ abstract class StreamExecution(
Option(name).map(_ + "<br/>").getOrElse("") +
s"id = $id<br/>runId = $runId<br/>batch = $batchDescription"
}

protected def createStreamingWrite(
table: SupportsStreamingWrite,
options: Map[String, String],
inputPlan: LogicalPlan): StreamingWrite = {
val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(inputPlan.schema)
outputMode match {
case Append =>
writeBuilder.buildForStreaming()

case Complete =>
// TODO: we should do this check earlier when we have capability API.
require(writeBuilder.isInstanceOf[SupportsTruncate],
table.name + " does not support Complete mode.")
writeBuilder.asInstanceOf[SupportsTruncate].truncate().buildForStreaming()

case Update =>
// Although no v2 sinks really support Update mode now, but during tests we do want them
// to pretend to support Update mode, and treat Update mode same as Append mode.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very useful when testing with aggregate/join. We don't want to complicate the test cases using watermarks, and we can't use complete mode as some sinks don't support it.

if (Utils.isTesting) {
writeBuilder.buildForStreaming()
} else {
throw new IllegalArgumentException(
"Data source v2 streaming sinks does not support Update mode.")
}
}
}
}

object StreamExecution {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWrite
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType

case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame)
Expand Down Expand Up @@ -64,15 +63,16 @@ object ConsoleTable extends Table with SupportsStreamingWrite {
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this
// Do nothing for truncate. Console sink is special that it just prints all the records.
override def truncate(): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
assert(inputSchema != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.SupportsOutputMode
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock

Expand Down Expand Up @@ -175,14 +174,7 @@ class ContinuousExecution(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
}

// TODO: we should translate OutputMode to concrete write actions like truncate, but
// the truncate action is being developed in SPARK-26666.
val writeBuilder = sink.newWriteBuilder(new DataSourceOptions(extraOptions.asJava))
.withQueryId(runId.toString)
.withInputDataSchema(withNewSources.schema)
val streamingWrite = writeBuilder.asInstanceOf[SupportsOutputMode]
.outputMode(outputMode)
.buildForStreaming()
val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources)
val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources)

reportTimeTaken("queryPlanning") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table}
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -46,15 +45,17 @@ case class ForeachWriterTable[T](
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
new WriteBuilder with SupportsTruncate {
private var inputSchema: StructType = _

override def withInputDataSchema(schema: StructType): WriteBuilder = {
this.inputSchema = schema
this
}

override def outputMode(mode: OutputMode): WriteBuilder = this
// Do nothing for truncate. Foreach sink is special that it just forwards all the records to
// ForeachWriter.
override def truncate(): WriteBuilder = this

override def buildForStreaming(): StreamingWrite = {
new StreamingWrite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite, SupportsOutputMode}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -49,12 +47,12 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
override def schema(): StructType = StructType(Nil)

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
new WriteBuilder with SupportsOutputMode {
private var mode: OutputMode = _
new WriteBuilder with SupportsTruncate {
private var needTruncate: Boolean = false
private var inputSchema: StructType = _

override def outputMode(mode: OutputMode): WriteBuilder = {
this.mode = mode
override def truncate(): WriteBuilder = {
this.needTruncate = true
this
}

Expand All @@ -64,7 +62,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
}

override def buildForStreaming(): StreamingWrite = {
new MemoryStreamingWrite(MemorySinkV2.this, mode, inputSchema)
new MemoryStreamingWrite(MemorySinkV2.this, inputSchema, needTruncate)
}
}
}
Expand Down Expand Up @@ -101,27 +99,20 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi
}.mkString("\n")
}

def write(batchId: Long, outputMode: OutputMode, newRows: Array[Row]): Unit = {
def write(batchId: Long, needTruncate: Boolean, newRows: Array[Row]): Unit = {
val notCommitted = synchronized {
latestBatchId.isEmpty || batchId > latestBatchId.get
}
if (notCommitted) {
logDebug(s"Committing batch $batchId to $this")
outputMode match {
case Append | Update =>
val rows = AddedData(batchId, newRows)
synchronized { batches += rows }

case Complete =>
val rows = AddedData(batchId, newRows)
synchronized {
batches.clear()
batches += rows
}

case _ =>
throw new IllegalArgumentException(
s"Output mode $outputMode is not supported by MemorySinkV2")
val rows = AddedData(batchId, newRows)
if (needTruncate) {
synchronized {
batches.clear()
batches += rows
}
} else {
synchronized { batches += rows }
}
} else {
logDebug(s"Skipping already committed batch: $batchId")
Expand All @@ -139,32 +130,32 @@ case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row])
extends WriterCommitMessage {}

class MemoryStreamingWrite(
val sink: MemorySinkV2, outputMode: OutputMode, schema: StructType)
val sink: MemorySinkV2, schema: StructType, needTruncate: Boolean)
extends StreamingWrite {

override def createStreamingWriterFactory: MemoryWriterFactory = {
MemoryWriterFactory(outputMode, schema)
MemoryWriterFactory(schema)
}

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
val newRows = messages.flatMap {
case message: MemoryWriterCommitMessage => message.data
}
sink.write(epochId, outputMode, newRows)
sink.write(epochId, needTruncate, newRows)
}

override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
// Don't accept any of the new input.
}
}

case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
case class MemoryWriterFactory(schema: StructType)
extends DataWriterFactory with StreamingDataWriterFactory {

override def createWriter(
partitionId: Int,
taskId: Long): DataWriter[InternalRow] = {
new MemoryDataWriter(partitionId, outputMode, schema)
new MemoryDataWriter(partitionId, schema)
}

override def createWriter(
Expand All @@ -175,7 +166,7 @@ case class MemoryWriterFactory(outputMode: OutputMode, schema: StructType)
}
}

class MemoryDataWriter(partition: Int, outputMode: OutputMode, schema: StructType)
class MemoryDataWriter(partition: Int, schema: StructType)
extends DataWriter[InternalRow] with Logging {

private val data = mutable.Buffer[Row]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("data writer") {
val partition = 1234
val writer = new MemoryDataWriter(
partition, OutputMode.Append(), new StructType().add("i", "int"))
partition, new StructType().add("i", "int"))
writer.write(InternalRow(1))
writer.write(InternalRow(2))
writer.write(InternalRow(44))
Expand All @@ -44,7 +44,7 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
test("streaming writer") {
val sink = new MemorySinkV2
val write = new MemoryStreamingWrite(
sink, OutputMode.Append(), new StructType().add("i", "int"))
sink, new StructType().add("i", "int"), needTruncate = false)
write.commit(0,
Array(
MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
Expand Down