Skip to content

[SPARK-27579][SQL] remove BaseStreamingSource and BaseStreamingSink #24471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{BaseStreamingSink, Sink, Source}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._
Expand Down Expand Up @@ -354,7 +354,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsRead with SupportsWrite with BaseStreamingSink {
with SupportsRead with SupportsWrite {

override def name(): String = s"Kafka $strategy"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
Expand Down Expand Up @@ -94,7 +95,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
message: String = "",
topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData {

override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = {
query match {
// Make sure no Spark job is running when deleting a topic
case Some(m: MicroBatchExecution) => m.processAllAvailable()
Expand All @@ -114,7 +115,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
query.nonEmpty,
"Cannot add data when there is no query for finding the active kafka source")

val sources: Seq[BaseStreamingSource] = {
val sources: Seq[SparkDataStream] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;

/**
* The base interface representing a readable data stream in a Spark streaming query. It's
Expand All @@ -28,7 +27,7 @@
* {@link MicroBatchStream} and {@link ContinuousStream}.
*/
@Evolving
public interface SparkDataStream extends BaseStreamingSource {
public interface SparkDataStream {

/**
* Returns the initial offset for a streaming query to start reading from. Note that the
Expand All @@ -50,4 +49,9 @@ public interface SparkDataStream extends BaseStreamingSource {
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);

/**
* Stop this source and free any resources it has allocated.
*/
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.BaseStreamingSink
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.writer._
Expand All @@ -40,7 +39,7 @@ class NoopDataSource extends TableProvider with DataSourceRegister {
override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable
}

private[noop] object NoopTable extends Table with SupportsWrite with BaseStreamingSink {
private[noop] object NoopTable extends Table with SupportsWrite {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder
override def name(): String = "noop-table"
override def schema(): StructType = new StructType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat
import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock

Expand All @@ -38,7 +38,7 @@ class MicroBatchExecution(
name: String,
checkpointRoot: String,
analyzedPlan: LogicalPlan,
sink: BaseStreamingSink,
sink: Table,
trigger: Trigger,
triggerClock: Clock,
outputMode: OutputMode,
Expand All @@ -48,7 +48,7 @@ class MicroBatchExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {

@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
Expand Down Expand Up @@ -354,7 +354,7 @@ class MicroBatchExecution(
if (isCurrentBatchConstructed) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[BaseStreamingSource, Option[Offset]] = uniqueSources.map {
val latestOffsets: Map[SparkDataStream, Option[Offset]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager}
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream

/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
Expand All @@ -39,7 +40,7 @@ case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMet
* This method is typically used to associate a serialized offset with actual sources (which
* cannot be serialized).
*/
def toStreamProgress(sources: Seq[BaseStreamingSource]): StreamProgress = {
def toStreamProgress(sources: Seq[SparkDataStream]): StreamProgress = {
assert(sources.size == offsets.size, s"There are [${offsets.size}] sources in the " +
s"checkpoint offsets and now there are [${sources.size}] sources requested by the query. " +
s"Cannot continue.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalP
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
import org.apache.spark.sql.sources.v2.Table
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, SparkDataStream}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
Expand All @@ -44,7 +45,7 @@ import org.apache.spark.util.Clock
trait ProgressReporter extends Logging {

case class ExecutionStats(
inputRows: Map[BaseStreamingSource, Long],
inputRows: Map[SparkDataStream, Long],
stateOperators: Seq[StateOperatorProgress],
eventTimeStats: Map[String, String])

Expand All @@ -55,10 +56,10 @@ trait ProgressReporter extends Logging {
protected def triggerClock: Clock
protected def logicalPlan: LogicalPlan
protected def lastExecution: QueryExecution
protected def newData: Map[BaseStreamingSource, LogicalPlan]
protected def newData: Map[SparkDataStream, LogicalPlan]
protected def sinkCommitProgress: Option[StreamWriterCommitProgress]
protected def sources: Seq[BaseStreamingSource]
protected def sink: BaseStreamingSink
protected def sources: Seq[SparkDataStream]
protected def sink: Table
protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession
Expand All @@ -67,8 +68,8 @@ trait ProgressReporter extends Logging {
// Local timestamps and counters.
private var currentTriggerStartTimestamp = -1L
private var currentTriggerEndTimestamp = -1L
private var currentTriggerStartOffsets: Map[BaseStreamingSource, String] = _
private var currentTriggerEndOffsets: Map[BaseStreamingSource, String] = _
private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _
private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _
// TODO: Restore this from the checkpoint when possible.
private var lastTriggerStartTimestamp = -1L

Expand Down Expand Up @@ -240,9 +241,9 @@ trait ProgressReporter extends Logging {
}

/** Extract number of input sources for each streaming source in plan */
private def extractSourceToNumInputRows(): Map[BaseStreamingSource, Long] = {
private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = {

def sumRows(tuples: Seq[(BaseStreamingSource, Long)]): Map[BaseStreamingSource, Long] = {
def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = {
tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
}

Expand All @@ -262,7 +263,7 @@ trait ProgressReporter extends Logging {
val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
case s: MicroBatchScanExec =>
val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
val source = s.stream.asInstanceOf[BaseStreamingSource]
val source = s.stream
source -> numRows
}
logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package org.apache.spark.sql.execution.streaming

import java.util

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.sources.v2.{Table, TableCapability}
import org.apache.spark.sql.types.StructType

/**
* An interface for systems that can collect the results of a streaming query. In order to preserve
* exactly once semantics a sink must be idempotent in the face of multiple attempts to add the same
* batch.
*
* Note that, we extends `Table` here, to make the v1 streaming sink API be compatible with
* data source v2.
*/
trait Sink extends BaseStreamingSink {
trait Sink extends Table {

/**
* Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
Expand All @@ -38,4 +45,16 @@ trait Sink extends BaseStreamingSink {
* after data is consumed by sink successfully.
*/
def addBatch(batchId: Long, data: DataFrame): Unit

override def name: String = {
throw new IllegalStateException("should not be called.")
}

override def schema: StructType = {
throw new IllegalStateException("should not be called.")
}

override def capabilities: util.Set[TableCapability] = {
throw new IllegalStateException("should not be called.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.types.StructType

/**
* A source of continually arriving data for a streaming query. A [[Source]] must have a
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*
* Note that, we extends `SparkDataStream` here, to make the v1 streaming source API be compatible
* with data source v2.
*/
trait Source extends BaseStreamingSource {
trait Source extends SparkDataStream {

/** Returns the schema of the data from this source */
def schema: StructType
Expand Down Expand Up @@ -62,6 +67,15 @@ trait Source extends BaseStreamingSource {
*/
def commit(end: Offset) : Unit = {}

/** Stop this source and free any resources it has allocated. */
def stop(): Unit
override def initialOffset(): OffsetV2 = {
throw new IllegalStateException("should not be called.")
}

override def deserializeOffset(json: String): OffsetV2 = {
throw new IllegalStateException("should not be called.")
}

override def commit(end: OffsetV2): Unit = {
throw new IllegalStateException("should not be called.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.SupportsWrite
import org.apache.spark.sql.sources.v2.{SupportsWrite, Table}
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
Expand Down Expand Up @@ -69,7 +70,7 @@ abstract class StreamExecution(
override val name: String,
private val checkpointRoot: String,
analyzedPlan: LogicalPlan,
val sink: BaseStreamingSink,
val sink: Table,
val trigger: Trigger,
val triggerClock: Clock,
val outputMode: OutputMode,
Expand Down Expand Up @@ -205,7 +206,7 @@ abstract class StreamExecution(
/**
* A list of unique sources in the query plan. This will be set when generating logical plan.
*/
@volatile protected var uniqueSources: Seq[BaseStreamingSource] = Seq.empty
@volatile protected var uniqueSources: Seq[SparkDataStream] = Seq.empty

/** Defines the internal state of execution */
protected val state = new AtomicReference[State](INITIALIZING)
Expand All @@ -214,7 +215,7 @@ abstract class StreamExecution(
var lastExecution: IncrementalExecution = _

/** Holds the most recent input data for each source. */
protected var newData: Map[BaseStreamingSource, LogicalPlan] = _
protected var newData: Map[SparkDataStream, LogicalPlan] = _

@volatile
protected var streamDeathCause: StreamingQueryException = null
Expand Down
Loading