Skip to content

Commit 315be9f

Browse files
anishm-dbAnishMahtogengliangwang
committed
[SPARK-52110][SDP][SQL] Implement SQL syntax support for pipelines
### What changes were proposed in this pull request? Introduces SQL syntax support needed to implement Spark Declarative Pipelines ([SPIP](https://issues.apache.org/jira/browse/SPARK-51727)). Spark Declarative Pipelines is a spark sub-project that allows users to declaratively define data processing pipelines, using Python (Pyspark) and/or SQL (Spark SQL). This PR implements the necessary Spark SQL syntax and commands to support the pipelines' SQL interface. ## Creating Materialized Views ``` CREATE MATERIALIZED VIEW [materialized view identifier] [column definitions]? [create table options]? AS [query] ``` Syntax to create a materialized view with identifier `materialized view identifier`, optionally specified column definitions (i.e schema) `column definitions`, optionally specified table options `create table options`, and the query that defines the data that will populate this materialized view `query`. This syntax is parsed into the `CreateMaterializedViewAsSelect` logical plan, which **cannot** be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be _interpreted_ by the pipelines module, in a future PR. ## Creating Streaming Tables ``` CREATE STREAMING TABLE [streaming table identifier] [column definitions]? [create table options]? (AS [query])? ``` Syntax to create a streaming table with identifier `streaming table identifier`, optionally specified column definitions (i.e schema) `column definitions`, optionally specified table options `create table options`, and optionally the query that defines the data that will populate this streaming table `query`. It is allowed for a streaming table to be defined without a query, as streaming tables' data can be backed by standalone flows (see below). During a pipeline execution, it will be validated that a streaming table has at least one standalone flow writing to the table, if no query is specified in the create statement itself. This syntax is parsed into either the `CreateStreamingTableAsSelect` or `CreateStreamingTable` logical plan, which **cannot** be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be _interpreted_ by the pipelines module, in a future PR. ## Creating Append Flows ``` CREATE FLOW [flow identifier] AS INSERT INTO BY NAME [flow destination identifier] [query] ``` Syntax to create an append (insert into) flow with identifier `flow identifier`, that inserts data defined by `query` into some destination dataset with identifier `flow destination identifier`. This syntax is parsed into the `CreateFlowCommand` logical plan, which **cannot** be executed by spark's query engine, and will throw an exception if attempted. Instead, the intention is for the logical plan to be _interpreted_ by the pipelines module, in a future PR. ## Annotating Logical Plans as Streaming ``` STREAM ([relation | TVF]) STREAM [relation | TVF] ``` Syntax to mark an [unresolved] relation or table valued function as streaming in SQL. Since streaming tables must read from streaming sources in their query, the `STREAM` keyword assists with marking batch data sources as streaming, so that streaming tables can treat them as streaming data sources. ### Why are the changes needed? These changes introduce SQL API for the pipelines project. ### Does this PR introduce _any_ user-facing change? Yes, this change introduces new Spark SQL syntax that can be parsed by the Spark SQL parser, but cannot be executed by Spark's query engine - the corresponding logical plans will instead be interpreted by the Spark Declarative Pipelines project. ### How was this patch tested? `org.apache.spark.sql.catalyst.analysis.StreamRelationSuite` `org.apache.spark.sql.execution.command.CreateFlowCommandSuite` `org.apache.spark.sql.execution.command.CreateStreamingTableAsSelectParserSuite` `org.apache.spark.sql.execution.command.CreateMaterializedViewAsSelectParserSuite` `org.apache.spark.sql.execution.command.v2.CreateStreamingTableParserSuite` ### Was this patch authored or co-authored using generative AI tooling? Closes #50875 from AnishMahto/add_spark_declarative_pipelines_sql_syntax_support. Lead-authored-by: anishm-db <anish.mahto@databricks.com> Co-authored-by: Anish Mahto <anish.mahto@databricks.com> Co-authored-by: AnishMahto <anish.mahto99@gmail.com> Co-authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 4b3a653 commit 315be9f

File tree

25 files changed

+1164
-14
lines changed

25 files changed

+1164
-14
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5876,6 +5876,16 @@
58765876
"CONTINUE exception handler is not supported. Use EXIT handler."
58775877
]
58785878
},
5879+
"CREATE_FLOW_QUERY_EXECUTION" : {
5880+
"message" : [
5881+
"Direct execution of a CREATE FLOW statement is not supported. To create a flow, create and execute a pipeline with a SQL file containing your statement attached in the pipeline spec."
5882+
]
5883+
},
5884+
"CREATE_PIPELINE_DATASET_QUERY_EXECUTION" : {
5885+
"message" : [
5886+
"Direct execution of a CREATE ... <pipelineDatasetType> query. To create a pipeline dataset, create and execute a pipeline with a SQL file containing your query attached in the pipeline definition."
5887+
]
5888+
},
58795889
"DESC_TABLE_COLUMN_JSON" : {
58805890
"message" : [
58815891
"DESC TABLE COLUMN AS JSON not supported for individual columns."
@@ -6349,6 +6359,12 @@
63496359
],
63506360
"sqlState" : "0A000"
63516361
},
6362+
"UNSUPPORTED_STREAMING_TABLE_VALUED_FUNCTION" : {
6363+
"message" : [
6364+
"The function <funcName> does not support streaming. Please remove the STREAM keyword"
6365+
],
6366+
"sqlState" : "42000"
6367+
},
63526368
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
63536369
"message" : [
63546370
"Unsupported subquery expression:"

docs/sql-ref-ansi-compliance.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ Below is a list of all the keywords in Spark SQL.
532532
|FILEFORMAT|non-reserved|non-reserved|non-reserved|
533533
|FIRST|non-reserved|non-reserved|non-reserved|
534534
|FLOAT|non-reserved|non-reserved|reserved|
535+
|FLOW|non-reserved|non-reserved|non-reserved|
535536
|FOLLOWING|non-reserved|non-reserved|non-reserved|
536537
|FOR|reserved|non-reserved|reserved|
537538
|FOREIGN|reserved|non-reserved|reserved|
@@ -604,6 +605,7 @@ Below is a list of all the keywords in Spark SQL.
604605
|MACRO|non-reserved|non-reserved|non-reserved|
605606
|MAP|non-reserved|non-reserved|non-reserved|
606607
|MATCHED|non-reserved|non-reserved|non-reserved|
608+
|MATERIALIZED|non-reserved|non-reserved|non-reserved|
607609
|MAX|non-reserved|non-reserved|non-reserved|
608610
|MERGE|non-reserved|non-reserved|non-reserved|
609611
|MICROSECOND|non-reserved|non-reserved|non-reserved|
@@ -723,6 +725,8 @@ Below is a list of all the keywords in Spark SQL.
723725
|STATISTICS|non-reserved|non-reserved|non-reserved|
724726
|STORED|non-reserved|non-reserved|non-reserved|
725727
|STRATIFY|non-reserved|non-reserved|non-reserved|
728+
|STREAM|non-reserved|non-reserved|non-reserved|
729+
|STREAMING|non-reserved|non-reserved|non-reserved|
726730
|STRING|non-reserved|non-reserved|non-reserved|
727731
|STRUCT|non-reserved|non-reserved|non-reserved|
728732
|SUBSTR|non-reserved|non-reserved|non-reserved|

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ FILTER: 'FILTER';
245245
FILEFORMAT: 'FILEFORMAT';
246246
FIRST: 'FIRST';
247247
FLOAT: 'FLOAT';
248+
FLOW: 'FLOW';
248249
FOLLOWING: 'FOLLOWING';
249250
FOR: 'FOR';
250251
FOREIGN: 'FOREIGN';
@@ -318,6 +319,7 @@ LOOP: 'LOOP';
318319
MACRO: 'MACRO';
319320
MAP: 'MAP' {incComplexTypeLevelCounter();};
320321
MATCHED: 'MATCHED';
322+
MATERIALIZED: 'MATERIALIZED';
321323
MAX: 'MAX';
322324
MERGE: 'MERGE';
323325
MICROSECOND: 'MICROSECOND';
@@ -436,6 +438,8 @@ START: 'START';
436438
STATISTICS: 'STATISTICS';
437439
STORED: 'STORED';
438440
STRATIFY: 'STRATIFY';
441+
STREAM: 'STREAM';
442+
STREAMING: 'STREAMING';
439443
STRING: 'STRING';
440444
STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
441445
SUBSTR: 'SUBSTR';

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,30 @@ statement
349349
(functionArgument (COMMA functionArgument)*)?
350350
RIGHT_PAREN #call
351351
| unsupportedHiveNativeCommands .*? #failNativeCommand
352+
| createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider?
353+
createTableClauses
354+
(AS query)? #createPipelineDataset
355+
| createPipelineFlowHeader insertInto query #createPipelineInsertIntoFlow
356+
;
357+
358+
materializedView
359+
: MATERIALIZED VIEW
360+
;
361+
362+
streamingTable
363+
: STREAMING TABLE
364+
;
365+
366+
createPipelineDatasetHeader
367+
: CREATE
368+
(materializedView | streamingTable)
369+
(IF errorCapturingNot EXISTS)?
370+
identifierReference
371+
;
372+
373+
streamRelationPrimary
374+
: STREAM multipartIdentifier tableAlias optionsClause? #streamTableName
375+
| STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN tableAlias optionsClause? #streamTableName
352376
;
353377

354378
setResetStatement
@@ -514,6 +538,10 @@ partitionVal
514538
| identifier EQ DEFAULT
515539
;
516540

541+
createPipelineFlowHeader
542+
: CREATE FLOW flowName=identifierReference (commentSpec)? AS
543+
;
544+
517545
namespace
518546
: NAMESPACE
519547
| DATABASE
@@ -973,7 +1001,8 @@ identifierComment
9731001
;
9741002

9751003
relationPrimary
976-
: identifierReference temporalClause?
1004+
: streamRelationPrimary #streamRelation
1005+
| identifierReference temporalClause?
9771006
optionsClause? sample? tableAlias #tableName
9781007
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
9791008
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
@@ -1769,6 +1798,7 @@ ansiNonReserved
17691798
| FILEFORMAT
17701799
| FIRST
17711800
| FLOAT
1801+
| FLOW
17721802
| FOLLOWING
17731803
| FORMAT
17741804
| FORMATTED
@@ -1825,6 +1855,7 @@ ansiNonReserved
18251855
| MACRO
18261856
| MAP
18271857
| MATCHED
1858+
| MATERIALIZED
18281859
| MAX
18291860
| MERGE
18301861
| MICROSECOND
@@ -1927,6 +1958,8 @@ ansiNonReserved
19271958
| STRATIFY
19281959
| STRING
19291960
| STRUCT
1961+
| STREAM
1962+
| STREAMING
19301963
| SUBSTR
19311964
| SUBSTRING
19321965
| SYNC
@@ -2139,6 +2172,7 @@ nonReserved
21392172
| FILEFORMAT
21402173
| FIRST
21412174
| FLOAT
2175+
| FLOW
21422176
| FOLLOWING
21432177
| FOR
21442178
| FOREIGN
@@ -2206,6 +2240,7 @@ nonReserved
22062240
| MACRO
22072241
| MAP
22082242
| MATCHED
2243+
| MATERIALIZED
22092244
| MAX
22102245
| MERGE
22112246
| MICROSECOND
@@ -2319,6 +2354,8 @@ nonReserved
23192354
| STATISTICS
23202355
| STORED
23212356
| STRATIFY
2357+
| STREAM
2358+
| STREAMING
23222359
| STRING
23232360
| STRUCT
23242361
| SUBSTR

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ object UnsupportedOperationChecker extends Logging {
4040
def checkForBatch(plan: LogicalPlan): Unit = {
4141
plan.foreachUp {
4242
case p if p.isStreaming =>
43-
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)
43+
throwError("Queries with streaming sources must be executed with writeStream.start(), or " +
44+
"from a streaming table or flow definition within a Spark Declarative Pipeline.")(p)
4445

4546
case d: DeduplicateWithinWatermark =>
4647
throwError("dropDuplicatesWithinWatermark is not supported with batch " +

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,7 @@ class AstBuilder extends DataTypeAstBuilder
862862
* }}}
863863
* operation to logical plan
864864
*/
865-
private def withInsertInto(
865+
protected def withInsertInto(
866866
ctx: InsertIntoContext,
867867
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
868868
ctx match {
@@ -878,7 +878,8 @@ class AstBuilder extends DataTypeAstBuilder
878878
ctx = insertParams.relationCtx,
879879
ident = ident,
880880
optionsClause = insertParams.options,
881-
writePrivileges = Seq(TableWritePrivilege.INSERT)),
881+
writePrivileges = Seq(TableWritePrivilege.INSERT),
882+
isStreaming = false),
882883
partitionSpec = insertParams.partitionSpec,
883884
userSpecifiedCols = insertParams.userSpecifiedCols,
884885
query = otherPlans.head,
@@ -894,7 +895,8 @@ class AstBuilder extends DataTypeAstBuilder
894895
ctx = insertParams.relationCtx,
895896
ident = ident,
896897
optionsClause = insertParams.options,
897-
writePrivileges = Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE)),
898+
writePrivileges = Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE),
899+
isStreaming = false),
898900
partitionSpec = insertParams.partitionSpec,
899901
userSpecifiedCols = insertParams.userSpecifiedCols,
900902
query = otherPlans.head,
@@ -907,7 +909,7 @@ class AstBuilder extends DataTypeAstBuilder
907909
withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => {
908910
OverwriteByExpression.byPosition(
909911
createUnresolvedRelation(ctx.identifierReference, ident, options,
910-
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE)),
912+
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false),
911913
otherPlans.head,
912914
expression(ctx.whereClause().booleanExpression()))
913915
})
@@ -2385,6 +2387,17 @@ class AstBuilder extends DataTypeAstBuilder
23852387
})
23862388
}
23872389

2390+
override def visitStreamTableName(ctx: StreamTableNameContext): LogicalPlan = {
2391+
val ident = visitMultipartIdentifier(ctx.multipartIdentifier)
2392+
val tableStreamingRelation = createUnresolvedRelation(
2393+
ctx = ctx,
2394+
ident = ident,
2395+
optionsClause = Option(ctx.optionsClause),
2396+
writePrivileges = Seq.empty,
2397+
isStreaming = true)
2398+
mayApplyAliasPlan(ctx.tableAlias, tableStreamingRelation)
2399+
}
2400+
23882401
/**
23892402
* Create an inline table (a virtual table in Hive parlance).
23902403
*/
@@ -3635,9 +3648,10 @@ class AstBuilder extends DataTypeAstBuilder
36353648
ctx: ParserRuleContext,
36363649
ident: Seq[String],
36373650
optionsClause: Option[OptionsClauseContext],
3638-
writePrivileges: Seq[TableWritePrivilege]): UnresolvedRelation = withOrigin(ctx) {
3651+
writePrivileges: Seq[TableWritePrivilege],
3652+
isStreaming: Boolean): UnresolvedRelation = withOrigin(ctx) {
36393653
val options = resolveOptions(optionsClause)
3640-
val relation = new UnresolvedRelation(ident, options, isStreaming = false)
3654+
val relation = new UnresolvedRelation(ident, options, isStreaming)
36413655
relation.requireWritePrivileges(writePrivileges)
36423656
}
36433657

@@ -4791,7 +4805,7 @@ class AstBuilder extends DataTypeAstBuilder
47914805
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))
47924806
}
47934807

4794-
private def partitionExpressions(
4808+
protected[sql] def partitionExpressions(
47954809
partTransforms: Seq[Transform],
47964810
partCols: Seq[ColumnDefinition],
47974811
ctx: ParserRuleContext): Seq[Transform] = {
@@ -5779,7 +5793,12 @@ class AstBuilder extends DataTypeAstBuilder
57795793
CacheTableAsSelect(ident.head, children.head, source(ctx.query()), isLazy, options)
57805794
} else {
57815795
CacheTable(
5782-
createUnresolvedRelation(ctx.identifierReference, ident, None, writePrivileges = Nil),
5796+
createUnresolvedRelation(
5797+
ctx.identifierReference,
5798+
ident,
5799+
None,
5800+
writePrivileges = Nil,
5801+
isStreaming = false),
57835802
ident, isLazy, options)
57845803
}
57855804
})

0 commit comments

Comments
 (0)