Skip to content

[SPARK-52110][SDP][SQL] Implement SQL syntax support for pipelines #50875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

AnishMahto
Copy link
Contributor

@AnishMahto AnishMahto commented May 13, 2025

What changes were proposed in this pull request?

Introduces SQL syntax support needed to implement Spark Declarative Pipelines (SPIP).

Spark Declarative Pipelines is a spark sub-project that allows users to declaratively define data processing pipelines, using Python (Pyspark) and/or SQL (Spark SQL). This PR implements the necessary Spark SQL syntax and commands to support the pipelines' SQL interface.

Creating Materialized Views

CREATE MATERIALIZED VIEW [materialized view identifier] [column definitions]? [create table options]? AS [query]

Syntax to create a materialized view with identifier materialized view identifier, optionally specified column definitions (i.e schema) column definitions, optionally specified table options create table options, and the query that defines the data that will populate this materialized view query.

This syntax is parsed into the CreateMaterializedViewAsSelect logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.

Creating Streaming Tables

CREATE STREAMING TABLE [streaming table identifier] [column definitions]? [create table options]? (AS [query])?

Syntax to create a streaming table with identifier streaming table identifier, optionally specified column definitions (i.e schema) column definitions, optionally specified table options create table options, and optionally the query that defines the data that will populate this streaming table query.

It is allowed for a streaming table to be defined without a query, as streaming tables' data can be backed by standalone flows (see below). During a pipeline execution, it will be validated that a streaming table has at least one standalone flow writing to the table, if no query is specified in the create statement itself.

This syntax is parsed into either the CreateStreamingTableAsSelect or CreateStreamingTable logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.

Creating Append Flows

CREATE FLOW [flow identifier] AS INSERT INTO BY NAME [flow destination identifier] [query]

Syntax to create an append (insert into) flow with identifier flow identifier, that inserts data defined by query into some destination dataset with identifier flow destination identifier.

This syntax is parsed into the CreateFlowCommand logical plan, which cannot be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be interpreted by the pipelines module, in a future PR.

Annotating Logical Plans as Streaming

STREAM ([relation | TVF])
STREAM [relation | TVF]

Syntax to mark an [unresolved] relation or table valued function as streaming in SQL. Since streaming tables must read from streaming sources in their query, the STREAM keyword assists with marking batch data sources as streaming, so that streaming tables can treat them as streaming data sources.

Why are the changes needed?

These changes introduce SQL API for the pipelines project.

Does this PR introduce any user-facing change?

Yes, this change introduces new Spark SQL syntax that can be parsed by the Spark SQL parser, but cannot be executed by Spark's query engine - the corresponding logical plans will instead be interpreted by the Spark Declarative Pipelines project.

How was this patch tested?

org.apache.spark.sql.catalyst.analysis.StreamRelationSuite
org.apache.spark.sql.execution.command.CreateFlowCommandSuite
org.apache.spark.sql.execution.command.CreateStreamingTableAsSelectParserSuite
org.apache.spark.sql.execution.command.CreateMaterializedViewAsSelectParserSuite
org.apache.spark.sql.execution.command.v2.CreateStreamingTableParserSuite

Was this patch authored or co-authored using generative AI tooling?

@AnishMahto
Copy link
Contributor Author

;

streamRelationPrimary
: STREAM multipartIdentifier tableAlias #streamTableName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we allow the optionsClause? For the Scala API, streaming table scan can have options: spark.readStream.options(...).table("t"). We should have the same functionality in SQL syntax.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, a couple thoughts:

  1. STREAM is an operator that simply marks an existing table or TVF as desired to be treated as a streaming source during analysis/execution.
    TVFs aside, in the case of a table [referenced by its metastore identifier] the table must have been created in the metastore and therefore should already have its streaming read options defined during creation - ex. CREATE TABLE t USING <provider> OPTIONS(<streaming options applicable to this table provider>) ...
  2. A streaming DataFrame (whether it uses the STREAM operator or not) can only be executed using writeStream - attempting to directly [batch] execute a streaming DataFrame using something like df.queryExecution.executedPlan will throw. I also believe it's not possible to read directly from a streaming DataFrame, you'd have to write the streaming DF to a sink and then you can readStream from that sink with whatever desired streaming read options.
  3. Combining points 1 and 2, I believe when doing something like spark.sql("SELECT * FROM STREAM t").writeStream..., the streaming query that's executed to write t's data into the configured sink should respect any relevant options set on t (at creation time) when reading from it.

More generally, the purpose of introducing the STREAM operator is to support defining pipeline streaming flows from batch queries, by marking those batch queries as streaming. This will allow the pipeline to writeStream the batch query that backs the streaming flow into a sink, which would otherwise fail here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we won't have a SQL equivalent for spark.readStream.options(...).table("t")? The table t can be created with options, but the scan time options should overwrite them. I think it's already the behavior of the DataFrame API. cc @HeartSaVioR

Copy link
Contributor

@HeartSaVioR HeartSaVioR May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the table must have been created in the metastore and therefore should already have its streaming read options defined during creation

I'm sorry, I don't agree with this. You can have two streaming queries reading from the same table but having intention to start from different offset/version/etc. There are definitely needs on query-specific options.

It's a bit different story whether we want to bind the option to STREAM keyword though - TVF is easier to deal with it since it receives options. If we bind the option to STREAM, TVF has options and STREAM has options. (Even in that case we may resolve this via overriding though.)

I just had a quick look at Flink (link) - they put this into table option, but their CREATE TABLE is not always mapping to the table we refer to - it's closer to the table we call in DSv2, not Parquet table, Delta table, Iceberg table, etc, which seems to be closer to @cloud-fan 's proposal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WITH and STREAM are two orthogonal syntaxes:

FROM STREAM t
FROM t WITH (...)
FROM STREAM t WITH (...)

TVF is different as it can take options by its own, so we probably shouldn't allow FROM STREAM tvf(...) WITH (...)

Copy link
Contributor Author

@AnishMahto AnishMahto May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, added WITH syntax support when using STREAM on a table, but not for TVFs. Also added org.apache.spark.sql.execution.streaming.StreamRelationSuite to show the specified options are indeed propagated to the streaming relation's data source.

@gengliangwang
Copy link
Member

@@ -37,9 +37,14 @@ import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
*/
object UnsupportedOperationChecker extends Logging {

def checkForBatch(plan: LogicalPlan): Unit = {
def checkForBatch(plan: LogicalPlan, isSqlContext: Boolean): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is existing SQLContext, shall we rename the parameter as isPipeline?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is existing SQLContext

Could you clarify what you mean by this?

Thinking about this more though, I think it still makes sense to show the Queries with streaming sources must be executed with writeStream.start() suggestion even if this is a SQL query (i.e query is being executed in a sql context), and additionally suggest an alternative that is to execute the streaming query with a pipeline - will make this change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant there is an existing class SQLContext https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/SQLContext.scala , which exists in every spark session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh gotcha. Decided to just unify the error messages, since users don't have to use pipelines to execute SQL that contains streaming relations - they can still just use readStream.

@github-actions github-actions bot added the DOCS label May 16, 2025
@AnishMahto
Copy link
Contributor Author

Heads up all, I made all of the SQL keywords added in this PR (FLOW, MATERIALIZED, STREAM, STREAMING) non-reserved. This should prevent breaking any existing workflows that create table identifiers using these words, for example.

import org.apache.spark.sql.catalyst.plans.logical.{Project, SubqueryAlias}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class StreamRelationSuite extends AnalysisTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a parser test suite instead of an analysis one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related thread: #50875 (comment)

The suite has a combination of both pure parsing and parser + analysis tests.

I decided to split the tests in the suite - the pure parsing tests have been moved to org.apache.spark.sql.catalyst.parser.StreamRelationParserSuite, and the analysis and execution tests have been moved to org.apache.spark.sql.execution.streaming.StreamRelationSuite

@@ -2105,6 +2105,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
catalog, "table-valued functions")
}
}
if (u.isStreaming && !resolvedFunc.isStreaming) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about the other way around? There is no STREAM keyword but the TVF is streaming. What do we do today? This is a bit unrelated to this PR though, cc @HeartSaVioR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how we resolve TVF nowadays - do we ever pass the notion of batch vs streaming "to" TVF, or is it expected for TVF to decide on its own?

Thinking back to how data source works, Spark asks the data source to behave batch vs streaming. If TVF can have potential to handle both batch and streaming, it'd be more natural to do the same, instead of TVF to decide on its own.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are great questions.

First of all, the intention of the STREAM keyword is to convert a batch relation into a streaming relation, with the primary use case being that it can be used to populate downstream streaming tables in pipelines using a batch source. If a relation is already streaming, the STREAM keyword should just be no-op.

Second, I think the TVF is responsible for defining whether it's streaming or not on its own. The TableFunctionRegistry trait at least is a registry of function names -> LogicalPlan, and the LogicalPlan backing the TVF can define whether its streaming or not (ex. Range is not streaming). And it looks like the resolution of a TVF is simply replacing it with the corresponding LogicalPlan mapped to that TVF in the registry, which then gets analyzed.

With my changes as-is, for TVFs I'm not converting the LogicalPlan (that backs the TVF, as per the function registry) to be streaming - I am just validating that when STREAM is used on a TVF, the resolved LogicalPlan for the TVF is indeed streaming - but I don't even know if any default/builtin functions in the registry are defined by a streaming logical plan anyway.

When I think about it, I don't actually think the pipelines module will need the ability to convert TVFs to streaming in the first place. In theory users can always define a table that is populated by a batch TVF, and then STREAM from that table.

Hence I decided to just remove STREAM <tvf> support for now, we can revisit adding it back later if a concrete use-case comes up.

// If the function was marked as streaming in the SQL statement but it's determined
// through resolution that the underlying function does not actually produce a
// streaming relation, throw an unsupported streaming exception.
throw QueryCompilationErrors.unsupportedStreamingTVF(u.funcName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to pass the fully qualified function name, we should do ident.toQualifiedNameParts(catalog) (this needs import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._)

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

@gengliangwang
Copy link
Member

Thanks, merging to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants