Skip to content

[SPARK-52110][SDP][SQL] Implement SQL syntax support for pipelines #50875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5853,6 +5853,16 @@
"CONTINUE exception handler is not supported. Use EXIT handler."
]
},
"CREATE_FLOW_QUERY_EXECUTION" : {
"message" : [
"Direct execution of a CREATE FLOW statement is not supported. To create a flow, create and execute a pipeline with a SQL file containing your statement attached in the pipeline spec."
]
},
"CREATE_PIPELINE_DATASET_QUERY_EXECUTION" : {
"message" : [
"Direct execution of a CREATE ... <pipelineDatasetType> query. To create a pipeline dataset, create and execute a pipeline with a SQL file containing your query attached in the pipeline definition."
]
},
"DESC_TABLE_COLUMN_JSON" : {
"message" : [
"DESC TABLE COLUMN AS JSON not supported for individual columns."
Expand Down Expand Up @@ -6326,6 +6336,12 @@
],
"sqlState" : "0A000"
},
"UNSUPPORTED_STREAMING_TABLE_VALUED_FUNCTION" : {
"message" : [
"The function <funcName> does not support streaming. Please remove the STREAM keyword"
],
"sqlState" : "42000"
},
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY" : {
"message" : [
"Unsupported subquery expression:"
Expand Down
4 changes: 4 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ Below is a list of all the keywords in Spark SQL.
|FILEFORMAT|non-reserved|non-reserved|non-reserved|
|FIRST|non-reserved|non-reserved|non-reserved|
|FLOAT|non-reserved|non-reserved|reserved|
|FLOW|non-reserved|non-reserved|non-reserved|
|FOLLOWING|non-reserved|non-reserved|non-reserved|
|FOR|reserved|non-reserved|reserved|
|FOREIGN|reserved|non-reserved|reserved|
Expand Down Expand Up @@ -604,6 +605,7 @@ Below is a list of all the keywords in Spark SQL.
|MACRO|non-reserved|non-reserved|non-reserved|
|MAP|non-reserved|non-reserved|non-reserved|
|MATCHED|non-reserved|non-reserved|non-reserved|
|MATERIALIZED|non-reserved|non-reserved|non-reserved|
|MAX|non-reserved|non-reserved|non-reserved|
|MERGE|non-reserved|non-reserved|non-reserved|
|MICROSECOND|non-reserved|non-reserved|non-reserved|
Expand Down Expand Up @@ -723,6 +725,8 @@ Below is a list of all the keywords in Spark SQL.
|STATISTICS|non-reserved|non-reserved|non-reserved|
|STORED|non-reserved|non-reserved|non-reserved|
|STRATIFY|non-reserved|non-reserved|non-reserved|
|STREAM|non-reserved|non-reserved|non-reserved|
|STREAMING|non-reserved|non-reserved|non-reserved|
|STRING|non-reserved|non-reserved|non-reserved|
|STRUCT|non-reserved|non-reserved|non-reserved|
|SUBSTR|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ FILTER: 'FILTER';
FILEFORMAT: 'FILEFORMAT';
FIRST: 'FIRST';
FLOAT: 'FLOAT';
FLOW: 'FLOW';
FOLLOWING: 'FOLLOWING';
FOR: 'FOR';
FOREIGN: 'FOREIGN';
Expand Down Expand Up @@ -318,6 +319,7 @@ LOOP: 'LOOP';
MACRO: 'MACRO';
MAP: 'MAP' {incComplexTypeLevelCounter();};
MATCHED: 'MATCHED';
MATERIALIZED: 'MATERIALIZED';
MAX: 'MAX';
MERGE: 'MERGE';
MICROSECOND: 'MICROSECOND';
Expand Down Expand Up @@ -436,6 +438,8 @@ START: 'START';
STATISTICS: 'STATISTICS';
STORED: 'STORED';
STRATIFY: 'STRATIFY';
STREAM: 'STREAM';
STREAMING: 'STREAMING';
STRING: 'STRING';
STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
SUBSTR: 'SUBSTR';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,30 @@ statement
(functionArgument (COMMA functionArgument)*)?
RIGHT_PAREN #call
| unsupportedHiveNativeCommands .*? #failNativeCommand
| createPipelineDatasetHeader (LEFT_PAREN tableElementList? RIGHT_PAREN)? tableProvider?
createTableClauses
(AS query)? #createPipelineDataset
| createPipelineFlowHeader insertInto query #createPipelineInsertIntoFlow
;

materializedView
: MATERIALIZED VIEW
;

streamingTable
: STREAMING TABLE
;

createPipelineDatasetHeader
: CREATE
(materializedView | streamingTable)
(IF errorCapturingNot EXISTS)?
identifierReference
;

streamRelationPrimary
: STREAM multipartIdentifier tableAlias optionsClause? #streamTableName
| STREAM LEFT_PAREN multipartIdentifier RIGHT_PAREN tableAlias optionsClause? #streamTableName
;

setResetStatement
Expand Down Expand Up @@ -514,6 +538,10 @@ partitionVal
| identifier EQ DEFAULT
;

createPipelineFlowHeader
: CREATE FLOW flowName=identifierReference (commentSpec)? AS
;

namespace
: NAMESPACE
| DATABASE
Expand Down Expand Up @@ -973,7 +1001,8 @@ identifierComment
;

relationPrimary
: identifierReference temporalClause?
: streamRelationPrimary #streamRelation
| identifierReference temporalClause?
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
Expand Down Expand Up @@ -1769,6 +1798,7 @@ ansiNonReserved
| FILEFORMAT
| FIRST
| FLOAT
| FLOW
| FOLLOWING
| FORMAT
| FORMATTED
Expand Down Expand Up @@ -1825,6 +1855,7 @@ ansiNonReserved
| MACRO
| MAP
| MATCHED
| MATERIALIZED
| MAX
| MERGE
| MICROSECOND
Expand Down Expand Up @@ -1927,6 +1958,8 @@ ansiNonReserved
| STRATIFY
| STRING
| STRUCT
| STREAM
| STREAMING
| SUBSTR
| SUBSTRING
| SYNC
Expand Down Expand Up @@ -2139,6 +2172,7 @@ nonReserved
| FILEFORMAT
| FIRST
| FLOAT
| FLOW
| FOLLOWING
| FOR
| FOREIGN
Expand Down Expand Up @@ -2206,6 +2240,7 @@ nonReserved
| MACRO
| MAP
| MATCHED
| MATERIALIZED
| MAX
| MERGE
| MICROSECOND
Expand Down Expand Up @@ -2319,6 +2354,8 @@ nonReserved
| STATISTICS
| STORED
| STRATIFY
| STREAM
| STREAMING
| STRING
| STRUCT
| SUBSTR
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object UnsupportedOperationChecker extends Logging {
def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)
throwError("Queries with streaming sources must be executed with writeStream.start(), or " +
"from a streaming table or flow definition within a Spark Declarative Pipeline.")(p)

case d: DeduplicateWithinWatermark =>
throwError("dropDuplicatesWithinWatermark is not supported with batch " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ class AstBuilder extends DataTypeAstBuilder
* }}}
* operation to logical plan
*/
private def withInsertInto(
protected def withInsertInto(
ctx: InsertIntoContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
ctx match {
Expand All @@ -878,7 +878,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx = insertParams.relationCtx,
ident = ident,
optionsClause = insertParams.options,
writePrivileges = Seq(TableWritePrivilege.INSERT)),
writePrivileges = Seq(TableWritePrivilege.INSERT),
isStreaming = false),
partitionSpec = insertParams.partitionSpec,
userSpecifiedCols = insertParams.userSpecifiedCols,
query = otherPlans.head,
Expand All @@ -894,7 +895,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx = insertParams.relationCtx,
ident = ident,
optionsClause = insertParams.options,
writePrivileges = Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE)),
writePrivileges = Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE),
isStreaming = false),
partitionSpec = insertParams.partitionSpec,
userSpecifiedCols = insertParams.userSpecifiedCols,
query = otherPlans.head,
Expand All @@ -907,7 +909,7 @@ class AstBuilder extends DataTypeAstBuilder
withIdentClause(ctx.identifierReference, Seq(query), (ident, otherPlans) => {
OverwriteByExpression.byPosition(
createUnresolvedRelation(ctx.identifierReference, ident, options,
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE)),
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false),
otherPlans.head,
expression(ctx.whereClause().booleanExpression()))
})
Expand Down Expand Up @@ -2385,6 +2387,17 @@ class AstBuilder extends DataTypeAstBuilder
})
}

override def visitStreamTableName(ctx: StreamTableNameContext): LogicalPlan = {
val ident = visitMultipartIdentifier(ctx.multipartIdentifier)
val tableStreamingRelation = createUnresolvedRelation(
ctx = ctx,
ident = ident,
optionsClause = Option(ctx.optionsClause),
writePrivileges = Seq.empty,
isStreaming = true)
mayApplyAliasPlan(ctx.tableAlias, tableStreamingRelation)
}

/**
* Create an inline table (a virtual table in Hive parlance).
*/
Expand Down Expand Up @@ -3635,9 +3648,10 @@ class AstBuilder extends DataTypeAstBuilder
ctx: ParserRuleContext,
ident: Seq[String],
optionsClause: Option[OptionsClauseContext],
writePrivileges: Seq[TableWritePrivilege]): UnresolvedRelation = withOrigin(ctx) {
writePrivileges: Seq[TableWritePrivilege],
isStreaming: Boolean): UnresolvedRelation = withOrigin(ctx) {
val options = resolveOptions(optionsClause)
val relation = new UnresolvedRelation(ident, options, isStreaming = false)
val relation = new UnresolvedRelation(ident, options, isStreaming)
relation.requireWritePrivileges(writePrivileges)
}

Expand Down Expand Up @@ -4791,7 +4805,7 @@ class AstBuilder extends DataTypeAstBuilder
(fileFormatSerdeInfo ++ rowFormatSerdeInfo).reduceLeftOption((l, r) => l.merge(r))
}

private def partitionExpressions(
protected[sql] def partitionExpressions(
partTransforms: Seq[Transform],
partCols: Seq[ColumnDefinition],
ctx: ParserRuleContext): Seq[Transform] = {
Expand Down Expand Up @@ -5781,7 +5795,12 @@ class AstBuilder extends DataTypeAstBuilder
CacheTableAsSelect(ident.head, children.head, source(ctx.query()), isLazy, options)
} else {
CacheTable(
createUnresolvedRelation(ctx.identifierReference, ident, None, writePrivileges = Nil),
createUnresolvedRelation(
ctx.identifierReference,
ident,
None,
writePrivileges = Nil,
isStreaming = false),
ident, isLazy, options)
}
})
Expand Down
Loading