-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-22732] Add Structured Streaming APIs to DataSourceV2 #19925
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test build #84622 has finished for PR 19925 at commit
|
Test build #84623 has finished for PR 19925 at commit
|
be0b8e9
to
22d07cd
Compare
Test build #84624 has finished for PR 19925 at commit
|
* contain the same records. | ||
* @param schema the schema of the data to be written. | ||
* @param mode the output mode which determines what successive batch output means to this | ||
* source, please refer to {@link OutputMode} for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to this sink? not source
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Fixed here and in ContinuousWriteSupport.
Test build #84728 has finished for PR 19925 at commit
|
retest this please |
Test build #84737 has finished for PR 19925 at commit
|
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good! This is really exciting. Just left some comments.
* | ||
* These offsets must be serializable. | ||
*/ | ||
public interface PartitionOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extends Serializable
|
||
package org.apache.spark.sql.sources.v2.reader; | ||
|
||
public abstract class Offset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add javadoc
} | ||
|
||
override def readSchema(): StructType = { | ||
StructType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can reuse org.apache.spark.sql.execution.streaming.RateSourceProvider.SCHEMA
override def createReadTasks(): java.util.List[ReadTask[Row]] = { | ||
val partitionStartMap = Option(offset.orElse(null)).map { | ||
case o: ContinuousRateStreamOffset => o.partitionToStartValue | ||
case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A v2 reader should never see SerializedOffset
. Right? The engine is supposed to call deserializeOffset
to get the real offset object.
case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json) | ||
case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") | ||
} | ||
if (partitionStartMap.exists(_.keySet.size > numPartitions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_.size
!= numPartitions
. Changing the numPartitions
will generate either duplicated data or discontinuous data. Right?
case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource") | ||
} | ||
if (partitionStartMap.exists(_.keySet.size > numPartitions)) { | ||
throw new IllegalArgumentException("Start offset contained too many partitions.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you make the error message more clear? Such as, The previous run has XXX partitions but the new one has XXX partitions. The "numPartitions" option must not be changed.
// Set the timestamp for the first time. | ||
if (currentRow == null) nextReadTime = System.currentTimeMillis() + 1000 | ||
|
||
if (numReadRows == rowsPerSecond) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timestamp column will be uneven in the current implementation. Could you just calculate how long to sleep for each row and sleep for each row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could, but the unevenness matches the microbatch behavior. Having it sleep for each row would make it significantly harder to write reliable tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The microbatch source generates even timestamps. Here is the code:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala
Line 213 in 3c0c2d0
InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, so it does. I'm not sure why I thought it didn't.
Having said that, I still think even timestamps would pose a serious obstacle to writing reliable tests of the continuous processing engine, which is the primary goal of this implementation. Having a second long window to commit data and shut down the query lets us figure out exactly which rows should have made it to the sink, at least in Spark unit tests where everything is running locally. If the rows come evenly, we'll have a much harder time figuring out what data is supposed to be present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this offline, and agreed that:
- it's best for the timestamps to be even
- it's fine to adjust the rate source tests in the full implementation to only check that at least the expected data is present (but more may have come in).
override def close(): Unit = {} | ||
|
||
override def getOffset(): PartitionOffset = | ||
ContinuousRateStreamPartitionOffset(partitionIndex, currentValue - increment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currentValue - increment
will be wrong if the first next()
call is interrupted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Your comment made me relalize there was a fencepost error here, which I'll swap some things around to fix.)
getOffset(), like get(), is supposed to be paired with next() in a pseudo-iterator and non-thread-safe manner. It'd be incorrect for a caller to look for getOffset() to be correct after a failed next() call.
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { | ||
val newRows = messages.flatMap { | ||
case message: MemoryWriterCommitMessage => message.data | ||
case _ => Seq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any possible message type here? If not, it's better to throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the explicit mapping. I don't want to set the expectation that every data source implementation needs to check and throw exceptions here; it's Spark's responsibility to pass in the right type of message.
Test build #84803 has finished for PR 19925 at commit
|
Test build #84815 has finished for PR 19925 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updating. Left two comments about unit tests.
val reader = new RateStreamV2Reader( | ||
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava)) | ||
reader.setOffsetRange(Optional.empty(), | ||
Optional.of(LongOffset(System.currentTimeMillis() + 1001))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be flaky when the Jenkins is overload. Could you add a clock parameter to RateStreamV2Reader and use ManualClock
instead so that we can control the clock?
case t: RateStreamReadTask => | ||
// Read the first 11 rows. Each partition should be outputting 10 rows per second, so | ||
// the 11th row should come 1 second (within a confidence interval) after the first. | ||
val startTime = System.currentTimeMillis() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: use ManualClock
to avoid flakiness.
Test build #84874 has finished for PR 19925 at commit
|
LGTM pending tests |
Test build #84884 has finished for PR 19925 at commit
|
* @param queryId A unique string for the writing query. It's possible that there are many writing | ||
* queries running at the same time, and the returned {@link DataSourceV2Writer} | ||
* can use this id to distinguish itself from others. | ||
* @param epochId The uniquenumeric ID of the batch within this writing query. This is an |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: unique numeric
/** | ||
* Get the offset of the current record, or the start offset if no records have been read. | ||
* | ||
* The execution engine will call this method along with get() to keep track of the current |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to use a real java doc link, e.g. {@link DataReader#get}
import java.util.Optional; | ||
|
||
/** | ||
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a mix-in interface but a variation on DataSourceV2Reader
, right?
import java.util.Optional; | ||
|
||
/** | ||
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
*/ | ||
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource { | ||
/** | ||
* Set the desired offset range for read tasks created from this reader. Read tasks will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mean this method must be called before createReadTasks
?
import org.apache.spark.annotation.InterfaceStability; | ||
|
||
/** | ||
* A {@link DataSourceV2Writer} for use with continuous stream processing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A variation of ...
?
shall we move these new interfaces to |
I would probably do ... streaming.reader/writer if we are going to namespace it. |
What changes were proposed in this pull request?
This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary:
DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface.
DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.
DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false.
Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.)
DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely.
Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming.
How was this patch tested?
Toy implementations of the new interfaces with unit tests.