-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-52110][SDP][SQL] Implement SQL syntax support for pipelines #50875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52110][SDP][SQL] Implement SQL syntax support for pipelines #50875
Conversation
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
; | ||
|
||
streamRelationPrimary | ||
: STREAM multipartIdentifier tableAlias #streamTableName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we allow the optionsClause
? For the Scala API, streaming table scan can have options: spark.readStream.options(...).table("t")
. We should have the same functionality in SQL syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, a couple thoughts:
- STREAM is an operator that simply marks an existing table or TVF as desired to be treated as a streaming source during analysis/execution.
TVFs aside, in the case of a table [referenced by its metastore identifier] the table must have been created in the metastore and therefore should already have its streaming read options defined during creation - ex.CREATE TABLE t USING <provider> OPTIONS(<streaming options applicable to this table provider>) ...
- A streaming DataFrame (whether it uses the
STREAM
operator or not) can only be executed usingwriteStream
- attempting to directly [batch] execute a streaming DataFrame using something likedf.queryExecution.executedPlan
will throw. I also believe it's not possible to read directly from a streaming DataFrame, you'd have to write the streaming DF to a sink and then you canreadStream
from that sink with whatever desired streaming read options. - Combining points 1 and 2, I believe when doing something like
spark.sql("SELECT * FROM STREAM t").writeStream...
, the streaming query that's executed to writet
's data into the configured sink should respect any relevant options set ont
(at creation time) when reading from it.
More generally, the purpose of introducing the STREAM operator is to support defining pipeline streaming flows from batch queries, by marking those batch queries as streaming. This will allow the pipeline to writeStream
the batch query that backs the streaming flow into a sink, which would otherwise fail here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we won't have a SQL equivalent for spark.readStream.options(...).table("t")
? The table t
can be created with options, but the scan time options should overwrite them. I think it's already the behavior of the DataFrame API. cc @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the table must have been created in the metastore and therefore should already have its streaming read options defined during creation
I'm sorry, I don't agree with this. You can have two streaming queries reading from the same table but having intention to start from different offset/version/etc. There are definitely needs on query-specific options.
It's a bit different story whether we want to bind the option to STREAM keyword though - TVF is easier to deal with it since it receives options. If we bind the option to STREAM, TVF has options and STREAM has options. (Even in that case we may resolve this via overriding though.)
I just had a quick look at Flink (link) - they put this into table option, but their CREATE TABLE is not always mapping to the table we refer to - it's closer to the table we call in DSv2, not Parquet table, Delta table, Iceberg table, etc, which seems to be closer to @cloud-fan 's proposal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WITH and STREAM are two orthogonal syntaxes:
FROM STREAM t
FROM t WITH (...)
FROM STREAM t WITH (...)
TVF is different as it can take options by its own, so we probably shouldn't allow FROM STREAM tvf(...) WITH (...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, added WITH
syntax support when using STREAM on a table, but not for TVFs. Also added org.apache.spark.sql.execution.streaming.StreamRelationSuite
to show the specified options are indeed propagated to the streaming relation's data source.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
Outdated
Show resolved
Hide resolved
...st/scala/org/apache/spark/sql/execution/command/CreatePipelineDatasetAsSelectSuiteBase.scala
Outdated
Show resolved
Hide resolved
@AnishMahto please fix the test failures in https://github.com/AnishMahto/spark/actions/runs/15058258924/job/42328483705 |
@@ -37,9 +37,14 @@ import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode} | |||
*/ | |||
object UnsupportedOperationChecker extends Logging { | |||
|
|||
def checkForBatch(plan: LogicalPlan): Unit = { | |||
def checkForBatch(plan: LogicalPlan, isSqlContext: Boolean): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is existing SQLContext
, shall we rename the parameter as isPipeline
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there is existing SQLContext
Could you clarify what you mean by this?
Thinking about this more though, I think it still makes sense to show the Queries with streaming sources must be executed with writeStream.start()
suggestion even if this is a SQL query (i.e query is being executed in a sql context), and additionally suggest an alternative that is to execute the streaming query with a pipeline - will make this change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant there is an existing class SQLContext
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/SQLContext.scala , which exists in every spark session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh gotcha. Decided to just unify the error messages, since users don't have to use pipelines to execute SQL that contains streaming relations - they can still just use readStream.
Heads up all, I made all of the SQL keywords added in this PR (FLOW, MATERIALIZED, STREAM, STREAMING) non-reserved. This should prevent breaking any existing workflows that create table identifiers using these words, for example. |
...c/test/scala/org/apache/spark/sql/execution/command/v2/CreateStreamingTableParserSuite.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamRelationSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
Outdated
Show resolved
Hide resolved
import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias} | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
||
class StreamRelationSuite extends AnalysisTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a parser test suite instead of an analysis one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related thread: #50875 (comment)
The suite has a combination of both pure parsing and parser + analysis tests.
I decided to split the tests in the suite - the pure parsing tests have been moved to org.apache.spark.sql.catalyst.parser.StreamRelationParserSuite
, and the analysis and execution tests have been moved to org.apache.spark.sql.execution.streaming.StreamRelationSuite
...la/org/apache/spark/sql/execution/command/CreatePipelineDatasetAsSelectParserSuiteBase.scala
Outdated
Show resolved
Hide resolved
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
Outdated
Show resolved
Hide resolved
@@ -2105,6 +2105,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor | |||
catalog, "table-valued functions") | |||
} | |||
} | |||
if (u.isStreaming && !resolvedFunc.isStreaming) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about the other way around? There is no STREAM keyword but the TVF is streaming. What do we do today? This is a bit unrelated to this PR though, cc @HeartSaVioR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how we resolve TVF nowadays - do we ever pass the notion of batch vs streaming "to" TVF, or is it expected for TVF to decide on its own?
Thinking back to how data source works, Spark asks the data source to behave batch vs streaming. If TVF can have potential to handle both batch and streaming, it'd be more natural to do the same, instead of TVF to decide on its own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are great questions.
First of all, the intention of the STREAM keyword is to convert a batch relation into a streaming relation, with the primary use case being that it can be used to populate downstream streaming tables in pipelines using a batch source. If a relation is already streaming, the STREAM keyword should just be no-op.
Second, I think the TVF is responsible for defining whether it's streaming or not on its own. The TableFunctionRegistry
trait at least is a registry of function names -> LogicalPlan
, and the LogicalPlan
backing the TVF can define whether its streaming or not (ex. Range
is not streaming). And it looks like the resolution of a TVF is simply replacing it with the corresponding LogicalPlan mapped to that TVF in the registry, which then gets analyzed.
With my changes as-is, for TVFs I'm not converting the LogicalPlan (that backs the TVF, as per the function registry) to be streaming - I am just validating that when STREAM is used on a TVF, the resolved LogicalPlan for the TVF is indeed streaming - but I don't even know if any default/builtin functions in the registry are defined by a streaming logical plan anyway.
When I think about it, I don't actually think the pipelines module will need the ability to convert TVFs to streaming in the first place. In theory users can always define a table that is populated by a batch TVF, and then STREAM from that table.
Hence I decided to just remove STREAM <tvf>
support for now, we can revisit adding it back later if a concrete use-case comes up.
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
Outdated
Show resolved
Hide resolved
// If the function was marked as streaming in the SQL statement but it's determined | ||
// through resolution that the underlying function does not actually produce a | ||
// streaming relation, throw an unsupported streaming exception. | ||
throw QueryCompilationErrors.unsupportedStreamingTVF(u.funcName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to pass the fully qualified function name, we should do ident.toQualifiedNameParts(catalog)
(this needs import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
)
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...alyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
Outdated
Show resolved
Hide resolved
...catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/StreamRelationParserSuite.scala
Show resolved
Hide resolved
...la/org/apache/spark/sql/execution/command/CreatePipelineDatasetAsSelectParserSuiteBase.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CreateFlowCommandSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor comments
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
Outdated
Show resolved
Hide resolved
Thanks, merging to master |
What changes were proposed in this pull request?
Introduces SQL syntax support needed to implement Spark Declarative Pipelines (SPIP).
Spark Declarative Pipelines is a spark sub-project that allows users to declaratively define data processing pipelines, using Python (Pyspark) and/or SQL (Spark SQL). This PR implements the necessary Spark SQL syntax and commands to support the pipelines' SQL interface.
Creating Materialized Views
Syntax to create a materialized view with identifier
materialized view identifier
, optionally specified column definitions (i.e schema)column definitions
, optionally specified table optionscreate table options
, and the query that defines the data that will populate this materialized viewquery
.This syntax is parsed into the
CreateMaterializedViewAsSelect
logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.Creating Streaming Tables
Syntax to create a streaming table with identifier
streaming table identifier
, optionally specified column definitions (i.e schema)column definitions
, optionally specified table optionscreate table options
, and optionally the query that defines the data that will populate this streaming tablequery
.It is allowed for a streaming table to be defined without a query, as streaming tables' data can be backed by standalone flows (see below). During a pipeline execution, it will be validated that a streaming table has at least one standalone flow writing to the table, if no query is specified in the create statement itself.
This syntax is parsed into either the
CreateStreamingTableAsSelect
orCreateStreamingTable
logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.Creating Append Flows
Syntax to create an append (insert into) flow with identifier
flow identifier
, that inserts data defined byquery
into some destination dataset with identifierflow destination identifier
.This syntax is parsed into the
CreateFlowCommand
logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.Annotating Logical Plans as Streaming
Syntax to mark an [unresolved] relation or table valued function as streaming in SQL. Since streaming tables must read from streaming sources in their query, the
STREAM
keyword assists with marking batch data sources as streaming, so that streaming tables can treat them as streaming data sources.Why are the changes needed?
These changes introduce SQL API for the pipelines project.
Does this PR introduce any user-facing change?
Yes, this change introduces new Spark SQL syntax that can be parsed by the Spark SQL parser, but cannot be executed by Spark's query engine - the corresponding logical plans will instead be interpreted by the Spark Declarative Pipelines project.
How was this patch tested?
org.apache.spark.sql.catalyst.analysis.StreamRelationSuite
org.apache.spark.sql.execution.command.CreateFlowCommandSuite
org.apache.spark.sql.execution.command.CreateStreamingTableAsSelectParserSuite
org.apache.spark.sql.execution.command.CreateMaterializedViewAsSelectParserSuite
org.apache.spark.sql.execution.command.v2.CreateStreamingTableParserSuite
Was this patch authored or co-authored using generative AI tooling?