Skip to content

[SPARK-14287] isStreaming method for Dataset #12080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -448,6 +449,20 @@ class Dataset[T] private[sql](
*/
def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation]

/**
* Returns true if this [[Dataset]] contains one or more sources that continuously
* return data as it arrives. A [[Dataset]] that reads data from a streaming source
* must be executed as a [[ContinuousQuery]] using the `startStream()` method in
* [[DataFrameWriter]]. Methods that return a single answer, (e.g., `count()` or
* `collect()`) will throw an [[AnalysisException]] when there is a streaming
* source present.
*
* @group basic
* @since 2.0.0
*/
@Experimental
def isStreaming: Boolean = logicalPlan.find(_.isInstanceOf[StreamingRelation]).isDefined

/**
* Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated,
* and all cells will be aligned right. For example:
Expand Down
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.sql.{Date, Timestamp}
import scala.language.postfixOps

import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
Expand Down Expand Up @@ -601,6 +602,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
TupleClass(1, "a")
)
}

test("isStreaming returns false for static Dataset") {
val data = Seq(("a", 1), ("b", 2), ("c", 3)).toDS()
assert(!data.isStreaming, "static Dataset returned true for 'isStreaming'.")
}

test("isStreaming returns true for streaming Dataset") {
val data = MemoryStream[Int].toDS()
assert(data.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}

test("isStreaming returns true after static and streaming Dataset join") {
val static = Seq(("a", 1), ("b", 2), ("c", 3)).toDF("a", "b")
val streaming = MemoryStream[Int].toDS().toDF("b")
val df = streaming.join(static, Seq("b"))
assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}
}

case class OtherTuple(_1: String, _2: Int)
Expand Down