Skip to content

[SPARK-22732] Add Structured Streaming APIs to DataSourceV2 #19925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 33 commits into from

Conversation

jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

This PR provides DataSourceV2 API support for structured streaming, including new pieces needed to support continuous processing [SPARK-20928]. High level summary:

  • DataSourceV2 includes new mixins to support micro-batch and continuous reads and writes. For reads, we accept an optional user specified schema rather than using the ReadSupportWithSchema model, because doing so would severely complicate the interface.

  • DataSourceV2Reader includes new interfaces to read a specific microbatch or read continuously from a given offset. These follow the same setter pattern as the existing Supports* mixins so that they can work with SupportsScanUnsafeRow.

  • DataReader (the per-partition reader) has a new subinterface ContinuousDataReader only for continuous processing. This reader has a special method to check progress, and next() blocks for new input rather than returning false.

  • Offset, an abstract representation of position in a streaming query, is ported to the public API. (Each type of reader will define its own Offset implementation.)

  • DataSourceV2Writer has a new subinterface ContinuousWriter only for continuous processing. Commits to this interface come tagged with an epoch number, as the execution engine will continue to produce new epoch commits as the task continues indefinitely.

Note that this PR does not propose to change the existing DataSourceV2 batch API, or deprecate the existing streaming source/sink internal APIs in spark.sql.execution.streaming.

How was this patch tested?

Toy implementations of the new interfaces with unit tests.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84622 has finished for PR 19925 at commit 1fdb2cc.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 7, 2017

Test build #84623 has finished for PR 19925 at commit be0b8e9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 8, 2017

Test build #84624 has finished for PR 19925 at commit 22d07cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* contain the same records.
* @param schema the schema of the data to be written.
* @param mode the output mode which determines what successive batch output means to this
* source, please refer to {@link OutputMode} for more details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to this sink? not source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Fixed here and in ContinuousWriteSupport.

@SparkQA
Copy link

SparkQA commented Dec 11, 2017

Test build #84728 has finished for PR 19925 at commit 7c46b33.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Dec 12, 2017

Test build #84737 has finished for PR 19925 at commit 7c46b33.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

retest this please

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good! This is really exciting. Just left some comments.

*
* These offsets must be serializable.
*/
public interface PartitionOffset {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends Serializable


package org.apache.spark.sql.sources.v2.reader;

public abstract class Offset {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add javadoc

}

override def readSchema(): StructType = {
StructType(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can reuse org.apache.spark.sql.execution.streaming.RateSourceProvider.SCHEMA

override def createReadTasks(): java.util.List[ReadTask[Row]] = {
val partitionStartMap = Option(offset.orElse(null)).map {
case o: ContinuousRateStreamOffset => o.partitionToStartValue
case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A v2 reader should never see SerializedOffset. Right? The engine is supposed to call deserializeOffset to get the real offset object.

case s: SerializedOffset => Serialization.read[Map[Int, Long]](s.json)
case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource")
}
if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_.size != numPartitions . Changing the numPartitions will generate either duplicated data or discontinuous data. Right?

case _ => throw new IllegalArgumentException("invalid offset type for ContinuousRateSource")
}
if (partitionStartMap.exists(_.keySet.size > numPartitions)) {
throw new IllegalArgumentException("Start offset contained too many partitions.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you make the error message more clear? Such as, The previous run has XXX partitions but the new one has XXX partitions. The "numPartitions" option must not be changed.

// Set the timestamp for the first time.
if (currentRow == null) nextReadTime = System.currentTimeMillis() + 1000

if (numReadRows == rowsPerSecond) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamp column will be uneven in the current implementation. Could you just calculate how long to sleep for each row and sleep for each row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could, but the unevenness matches the microbatch behavior. Having it sleep for each row would make it significantly harder to write reliable tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The microbatch source generates even timestamps. Here is the code:

InternalRow(DateTimeUtils.fromMillis(relative + localStartTimeMs), v)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, so it does. I'm not sure why I thought it didn't.

Having said that, I still think even timestamps would pose a serious obstacle to writing reliable tests of the continuous processing engine, which is the primary goal of this implementation. Having a second long window to commit data and shut down the query lets us figure out exactly which rows should have made it to the sink, at least in Spark unit tests where everything is running locally. If the rows come evenly, we'll have a much harder time figuring out what data is supposed to be present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline, and agreed that:

  • it's best for the timestamps to be even
  • it's fine to adjust the rate source tests in the full implementation to only check that at least the expected data is present (but more may have come in).

override def close(): Unit = {}

override def getOffset(): PartitionOffset =
ContinuousRateStreamPartitionOffset(partitionIndex, currentValue - increment)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currentValue - increment will be wrong if the first next() call is interrupted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Your comment made me relalize there was a fencepost error here, which I'll swap some things around to fix.)

getOffset(), like get(), is supposed to be paired with next() in a pseudo-iterator and non-thread-safe manner. It'd be incorrect for a caller to look for getOffset() to be correct after a failed next() call.

override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
val newRows = messages.flatMap {
case message: MemoryWriterCommitMessage => message.data
case _ => Seq()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possible message type here? If not, it's better to throw an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the explicit mapping. I don't want to set the expectation that every data source implementation needs to check and throw exceptions here; it's Spark's responsibility to pass in the right type of message.

@SparkQA
Copy link

SparkQA commented Dec 13, 2017

Test build #84803 has finished for PR 19925 at commit 7c46b33.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 13, 2017

Test build #84815 has finished for PR 19925 at commit 8809bf9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updating. Left two comments about unit tests.

val reader = new RateStreamV2Reader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
reader.setOffsetRange(Optional.empty(),
Optional.of(LongOffset(System.currentTimeMillis() + 1001)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be flaky when the Jenkins is overload. Could you add a clock parameter to RateStreamV2Reader and use ManualClock instead so that we can control the clock?

case t: RateStreamReadTask =>
// Read the first 11 rows. Each partition should be outputting 10 rows per second, so
// the 11th row should come 1 second (within a confidence interval) after the first.
val startTime = System.currentTimeMillis()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: use ManualClock to avoid flakiness.

@SparkQA
Copy link

SparkQA commented Dec 13, 2017

Test build #84874 has finished for PR 19925 at commit 0974ac3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Dec 13, 2017

LGTM pending tests

@SparkQA
Copy link

SparkQA commented Dec 14, 2017

Test build #84884 has finished for PR 19925 at commit 3cb6cee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Dec 14, 2017

Thanks! Merging to master to unblock #19926. If there are more comments, we can address them in #19926.

@asfgit asfgit closed this in f8c7c1f Dec 14, 2017
* @param queryId A unique string for the writing query. It's possible that there are many writing
* queries running at the same time, and the returned {@link DataSourceV2Writer}
* can use this id to distinguish itself from others.
* @param epochId The uniquenumeric ID of the batch within this writing query. This is an
Copy link
Contributor

@cloud-fan cloud-fan Dec 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: unique numeric

/**
* Get the offset of the current record, or the start offset if no records have been read.
*
* The execution engine will call this method along with get() to keep track of the current
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use a real java doc link, e.g. {@link DataReader#get}

import java.util.Optional;

/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a mix-in interface but a variation on DataSourceV2Reader, right?

import java.util.Optional;

/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

*/
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
/**
* Set the desired offset range for read tasks created from this reader. Read tasks will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean this method must be called before createReadTasks?

import org.apache.spark.annotation.InterfaceStability;

/**
* A {@link DataSourceV2Writer} for use with continuous stream processing.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A variation of ...?

@cloud-fan
Copy link
Contributor

shall we move these new interfaces to org.apache.spark.sql.sources.v2.reader/write.streaming package?

@marmbrus
Copy link
Contributor

I would probably do ... streaming.reader/writer if we are going to namespace it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants