Skip to content

[SPARK-27642][SS] make v1 offset extends v2 offset #24538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class KafkaContinuousStream(
}

override def planInputPartitions(start: Offset): Array[InputPartition] = {
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
val oldStartPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets

val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,10 @@
package org.apache.spark.sql.execution.streaming;

/**
* This is an internal, deprecated interface. New source implementations should use the
* org.apache.spark.sql.sources.v2.reader.streaming.Offset class, which is the one that will be
* supported in the long term.
* This class is an alias of {@link org.apache.spark.sql.sources.v2.reader.streaming.Offset}. It's
* internal and deprecated. New streaming data source implementations should use data source v2 API,
* which will be supported in the long term.
*
* This class will be removed in a future release.
*/
public abstract class Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
* Note: We assume that equivalent/equal offsets serialize to
* identical JSON strings.
*
* @return JSON string encoding
*/
public abstract String json();

/**
* Equality based on JSON string representation. We leverage the
* JSON representation for normalization between the Offset's
* in memory and on disk representations.
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof Offset) {
return this.json().equals(((Offset) obj).json());
} else {
return false;
}
}

@Override
public int hashCode() {
return this.json().hashCode();
}

@Override
public String toString() {
return this.json();
}
}
public abstract class Offset extends org.apache.spark.sql.sources.v2.reader.streaming.Offset {}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
*
* Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
* maintain compatibility with DataSource V1 APIs. This extension will be removed once we
* get rid of V1 completely.
*/
@Evolving
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
public abstract class Offset {
/**
* A JSON-serialized representation of an Offset that is
* used for saving offsets to the offset log.
Expand All @@ -49,9 +45,8 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
*/
@Override
public boolean equals(Object obj) {
if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
return this.json()
.equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
if (obj instanceof Offset) {
return this.json().equals(((Offset) obj).json());
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

/**
* A simple offset for sources that produce a single linear stream of data.
*/
case class LongOffset(offset: Long) extends OffsetV2 {
case class LongOffset(offset: Long) extends Offset {

override val json = offset.toString

Expand All @@ -37,14 +35,4 @@ object LongOffset {
* @return new LongOffset
*/
def apply(offset: SerializedOffset) : LongOffset = new LongOffset(offset.json.toLong)

/**
* Convert generic Offset to LongOffset if possible.
* @return converted LongOffset
*/
def convert(offset: Offset): Option[LongOffset] = offset match {
case lo: LongOffset => Some(lo)
case so: SerializedOffset => Some(LongOffset(so))
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ class MicroBatchExecution(
* batch will be executed before getOffset is called again. */
availableOffsets.foreach {
case (source: Source, end: Offset) =>
val start = committedOffsets.get(source)
val start = committedOffsets.get(source).map(_.asInstanceOf[Offset])
source.getBatch(start, end)
case nonV1Tuple =>
// The V2 API does not have the same edge case requiring getBatch to be called
Expand Down Expand Up @@ -354,7 +354,7 @@ class MicroBatchExecution(
if (isCurrentBatchConstructed) return true

// Generate a map from each unique source to the next available offset.
val latestOffsets: Map[SparkDataStream, Option[Offset]] = uniqueSources.map {
val latestOffsets: Map[SparkDataStream, Option[OffsetV2]] = uniqueSources.map {
case s: Source =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
Expand Down Expand Up @@ -411,7 +411,7 @@ class MicroBatchExecution(
val prevBatchOff = offsetLog.get(currentBatchId - 1)
if (prevBatchOff.isDefined) {
prevBatchOff.get.toStreamProgress(sources).foreach {
case (src: Source, off) => src.commit(off)
case (src: Source, off: Offset) => src.commit(off)
case (stream: MicroBatchStream, off) =>
stream.commit(stream.deserializeOffset(off.json))
case (src, _) =>
Expand Down Expand Up @@ -448,9 +448,9 @@ class MicroBatchExecution(
// Request unprocessed data from all sources.
newData = reportTimeTaken("getBatch") {
availableOffsets.flatMap {
case (source: Source, available)
case (source: Source, available: Offset)
if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val current = committedOffsets.get(source).map(_.asInstanceOf[Offset])
val batch = source.getBatch(current, available)
assert(batch.isStreaming,
s"DataFrame returned by getBatch from $source did not have isStreaming=true\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager}
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream}


/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
case class OffsetSeq(offsets: Seq[Option[Offset]], metadata: Option[OffsetSeqMetadata] = None) {
case class OffsetSeq(offsets: Seq[Option[OffsetV2]], metadata: Option[OffsetSeqMetadata] = None) {

/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the ordered list of
Expand All @@ -57,13 +58,13 @@ object OffsetSeq {
* Returns a [[OffsetSeq]] with a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
*/
def fill(offsets: Offset*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
def fill(offsets: OffsetV2*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)

/**
* Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
*/
def fill(metadata: Option[String], offsets: Offset*): OffsetSeq = {
def fill(metadata: Option[String], offsets: OffsetV2*): OffsetSeq = {
OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets._
import scala.io.{Source => IOSource}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}

/**
* This class is used to log offsets to persistent files in HDFS.
Expand All @@ -47,7 +48,7 @@ class OffsetSeqLog(sparkSession: SparkSession, path: String)

override protected def deserialize(in: InputStream): OffsetSeq = {
// called inside a try-finally where the underlying stream is closed in the caller
def parseOffset(value: String): Offset = value match {
def parseOffset(value: String): OffsetV2 = value match {
case OffsetSeqLog.SERIALIZED_VOID_OFFSET => null
case json => SerializedOffset(json)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.command.StreamingExplainCommand
import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{SupportsWrite, Table}
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.sources.v2.writer.SupportsTruncate
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming._
Expand Down Expand Up @@ -438,7 +438,7 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is intended for use primarily when writing tests.
*/
private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset, timeoutMs: Long): Unit = {
private[sql] def awaitOffset(sourceIndex: Int, newOffset: OffsetV2, timeoutMs: Long): Unit = {
assertAwaitThread()
def notDone = {
val localCommittedOffsets = committedOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ package org.apache.spark.sql.execution.streaming

import scala.collection.{immutable, GenTraversableOnce}

import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream}


/**
* A helper class that looks like a Map[Source, Offset].
*/
class StreamProgress(
val baseMap: immutable.Map[SparkDataStream, Offset] =
new immutable.HashMap[SparkDataStream, Offset])
extends scala.collection.immutable.Map[SparkDataStream, Offset] {
val baseMap: immutable.Map[SparkDataStream, OffsetV2] =
new immutable.HashMap[SparkDataStream, OffsetV2])
extends scala.collection.immutable.Map[SparkDataStream, OffsetV2] {

def toOffsetSeq(source: Seq[SparkDataStream], metadata: OffsetSeqMetadata): OffsetSeq = {
OffsetSeq(source.map(get), Some(metadata))
Expand All @@ -36,17 +37,17 @@ class StreamProgress(
override def toString: String =
baseMap.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")

override def +[B1 >: Offset](kv: (SparkDataStream, B1)): Map[SparkDataStream, B1] = {
override def +[B1 >: OffsetV2](kv: (SparkDataStream, B1)): Map[SparkDataStream, B1] = {
baseMap + kv
}

override def get(key: SparkDataStream): Option[Offset] = baseMap.get(key)
override def get(key: SparkDataStream): Option[OffsetV2] = baseMap.get(key)

override def iterator: Iterator[(SparkDataStream, Offset)] = baseMap.iterator
override def iterator: Iterator[(SparkDataStream, OffsetV2)] = baseMap.iterator

override def -(key: SparkDataStream): Map[SparkDataStream, Offset] = baseMap - key
override def -(key: SparkDataStream): Map[SparkDataStream, OffsetV2] = baseMap - key

def ++(updates: GenTraversableOnce[(SparkDataStream, Offset)]): StreamProgress = {
def ++(updates: GenTraversableOnce[(SparkDataStream, OffsetV2)]): StreamProgress = {
new StreamProgress(baseMap ++ updates)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa
Dataset.ofRows(sqlContext.sparkSession, logicalPlan)
}

def addData(data: A*): Offset = {
def addData(data: A*): OffsetV2 = {
addData(data.toTraversable)
}

def addData(data: TraversableOnce[A]): OffsetV2

def fullSchema(): StructType = encoder.schema

protected val logicalPlan: LogicalPlan = {
Expand All @@ -77,8 +79,6 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa
None)(sqlContext.sparkSession)
}

def addData(data: TraversableOnce[A]): Offset

override def initialOffset(): OffsetV2 = {
throw new IllegalStateException("should not be called.")
}
Expand Down Expand Up @@ -226,22 +226,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

override def commit(end: OffsetV2): Unit = synchronized {
def check(newOffset: LongOffset): Unit = {
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
val newOffset = end.asInstanceOf[LongOffset]
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}

batches.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
}

LongOffset.convert(end) match {
case Some(lo) => check(lo)
case None => sys.error(s"MemoryStream.commit() received an offset ($end) " +
"that did not originate with an instance of this class")
}
batches.trimStart(offsetDiff)
lastOffsetCommitted = newOffset
}

override def stop() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int)
}

override def commit(end: Offset): Unit = synchronized {
val newOffset = LongOffset.convert(end).getOrElse(
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
)
val newOffset = end.asInstanceOf[LongOffset]

val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, SystemClock, Utils}
Expand Down Expand Up @@ -124,7 +124,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
* the active query, and then return the source object the data was added, as well as the
* offset of added data.
*/
def addData(query: Option[StreamExecution]): (SparkDataStream, Offset)
def addData(query: Option[StreamExecution]): (SparkDataStream, OffsetV2)
}

/** A trait that can be extended when testing a source. */
Expand All @@ -135,7 +135,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
case class AddDataMemory[A](source: MemoryStreamBase[A], data: Seq[A]) extends AddData {
override def toString: String = s"AddData to $source: ${data.mkString(",")}"

override def addData(query: Option[StreamExecution]): (SparkDataStream, Offset) = {
override def addData(query: Option[StreamExecution]): (SparkDataStream, OffsetV2) = {
(source, source.addData(data))
}
}
Expand Down Expand Up @@ -337,7 +337,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
var pos = 0
var currentStream: StreamExecution = null
var lastStream: StreamExecution = null
val awaiting = new mutable.HashMap[Int, Offset]() // source index -> offset to wait for
val awaiting = new mutable.HashMap[Int, OffsetV2]() // source index -> offset to wait for
val sink = new MemorySink
val resetConfValues = mutable.Map[String, Option[String]]()
val defaultCheckpointLocation =
Expand Down