Skip to content

[SPARK-30214][SQL] A new framework to resolve v2 commands #26847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a005713
[SPARK-30214][SQL] Support COMMENT ON syntax
yaooqinn Dec 11, 2019
7c45c9b
naming
yaooqinn Dec 11, 2019
024ab39
Merge branch 'master' into SPARK-30214
yaooqinn Dec 11, 2019
e489e62
fix tests
yaooqinn Dec 11, 2019
9b272b6
comments
yaooqinn Dec 11, 2019
85617cd
impl UnresolvedNamespace
yaooqinn Dec 17, 2019
e33a200
megre master
yaooqinn Dec 17, 2019
3e37941
impl UnresolvedV2Table
yaooqinn Dec 17, 2019
7df9407
Merge branch 'master' into SPARK-30214
yaooqinn Dec 17, 2019
57c83fc
rm parent node
yaooqinn Dec 19, 2019
ccc4702
create namespace
yaooqinn Dec 19, 2019
72c01a9
drop namespace
yaooqinn Dec 20, 2019
1a7c800
set namespace props
yaooqinn Dec 20, 2019
415de11
set namespace location
yaooqinn Dec 20, 2019
6ab2228
desc namespace
yaooqinn Dec 20, 2019
56037ff
show current namespace
yaooqinn Dec 20, 2019
6675e7f
Revert "show current namespace"
yaooqinn Dec 20, 2019
a42e12e
Revert "desc namespace"
yaooqinn Dec 20, 2019
2c458f0
Revert "set namespace location"
yaooqinn Dec 20, 2019
573a66e
Revert "set namespace props"
yaooqinn Dec 20, 2019
f6e742b
Revert "drop namespace"
yaooqinn Dec 20, 2019
0754656
Revert "create namespace"
yaooqinn Dec 20, 2019
a868420
ident -> NamedRelation
yaooqinn Dec 20, 2019
5a6aa2a
add ident
yaooqinn Dec 20, 2019
0b89d3a
resovledtable should lookup view => v2 -> v1
yaooqinn Dec 20, 2019
a41acc5
ref
yaooqinn Dec 27, 2019
de054c7
add DataSourceV1Relation
yaooqinn Dec 27, 2019
e0836f6
Revert "add DataSourceV1Relation"
yaooqinn Dec 27, 2019
fc555be
handle view in analyzer
yaooqinn Dec 27, 2019
7b4f3e3
nit in tests
yaooqinn Dec 27, 2019
9f2159f
Merge branch 'master' into SPARK-30214
yaooqinn Dec 30, 2019
70028dd
update
yaooqinn Dec 30, 2019
9d10239
nit
yaooqinn Dec 31, 2019
c391212
fix test
yaooqinn Dec 31, 2019
d20b1b2
Merge branch 'master' into SPARK-30214
yaooqinn Jan 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ statement
| (DESC | DESCRIBE) TABLE? option=(EXTENDED | FORMATTED)?
multipartIdentifier partitionSpec? describeColName? #describeTable
| (DESC | DESCRIBE) QUERY? query #describeQuery
| COMMENT ON namespace multipartIdentifier IS
commennt=(STRING | NULL) #commentNamespace
| COMMENT ON TABLE multipartIdentifier IS commennt=(STRING | NULL) #commentTable
| REFRESH TABLE multipartIdentifier #refreshTable
| REFRESH (STRING | .*?) #refreshResource
| CACHE LAZY? TABLE multipartIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -197,6 +198,7 @@ class Analyzer(
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveInsertInto ::
ResolveRelations ::
Expand Down Expand Up @@ -721,6 +723,14 @@ class Analyzer(
}
}

case class ResolveNamespace(catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
ResolvedNamespace(catalog.asNamespaceCatalog, ns)
}
Comment on lines +729 to +731
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean the ParsedStatement from parser will turn to use UnresolvedNamespace? Currently, the catalogs in statements are resolved at ResolveCatalogs. Will we need to refactor ResolveCatalogs due to this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see
#26847 (comment)

We can have an extra rule to catch commands with v1 relation, and convert them to v1 commands. This can help us get rid of the duplicated code between ResolveCatalogs and ResolveSessionCatalog

}

private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty

/**
Expand All @@ -735,6 +745,11 @@ class Analyzer(
lookupTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
case u @ UnresolvedTable(ident) =>
lookupTempView(ident).foreach { _ =>
u.failAnalysis(s"${ident.quoted} is a temp view not table.")
}
u
}

def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
Expand Down Expand Up @@ -774,6 +789,11 @@ class Analyzer(
lookupV2Relation(u.multipartIdentifier)
.getOrElse(u)

case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
CatalogV2Util.loadTable(catalog, ident)
.map(ResolvedTable(catalog.asTableCatalog, ident, _))
.getOrElse(u)

case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier)
.map(v2Relation => i.copy(table = v2Relation))
Expand Down Expand Up @@ -859,6 +879,18 @@ class Analyzer(

case u: UnresolvedRelation =>
lookupRelation(u.multipartIdentifier).map(resolveViews).getOrElse(u)

case u @ UnresolvedTable(identifier: Seq[String]) =>
expandRelationName(identifier) match {
case SessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(v1Table: V1Table) if v1Table.v1Table.tableType == CatalogTableType.VIEW =>
u.failAnalysis(s"$ident is a view not table.")
case Some(table) => ResolvedTable(catalog.asTableCatalog, ident, table)
case None => u
}
case _ => u
}
}

// Look up a relation from the session catalog with the following logic:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ trait CheckAnalysis extends PredicateHelper {

case p if p.analyzed => // Skip already analyzed sub-plans

case u: UnresolvedNamespace =>
u.failAnalysis(s"Namespace not found: ${u.multipartIdentifier.quoted}")

case u: UnresolvedTable =>
u.failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}")

case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.connector.catalog.SupportsNamespaces

case class ResolvedNamespace(catalog: SupportsNamespaces, namespace: Seq[String])
extends LeafNode {
override def output: Seq[Attribute] = Nil
}

case class UnresolvedNamespace(multipartIdentifier: Seq[String]) extends LeafNode {
override lazy val resolved: Boolean = false

override def output: Seq[Attribute] = Nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}

case class ResolvedTable(catalog: TableCatalog, identifier: Identifier, table: Table)
extends LeafNode {
override def output: Seq[Attribute] = Nil
}

case class UnresolvedTable(multipartIdentifier: Seq[String]) extends LeafNode {
override lazy val resolved: Boolean = false

override def output: Seq[Attribute] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -3429,4 +3429,22 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
ctx.EXISTS != null,
ctx.TEMPORARY != null)
}

override def visitCommentNamespace(ctx: CommentNamespaceContext): LogicalPlan = withOrigin(ctx) {
val comment = ctx.commennt.getType match {
case SqlBaseParser.NULL => ""
case _ => string(ctx.STRING)
}
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
CommentOnNamespace(UnresolvedNamespace(nameParts), comment)
}

override def visitCommentTable(ctx: CommentTableContext): LogicalPlan = withOrigin(ctx) {
val comment = ctx.commennt.getType match {
case SqlBaseParser.NULL => ""
case _ => string(ctx.STRING)
}
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
CommentOnTable(UnresolvedTable(nameParts), comment)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.analysis.{NamedRelation, Star, UnresolvedException}
import org.apache.spark.sql.catalyst.analysis.{NamedRelation, UnresolvedException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, Unevaluable}
import org.apache.spark.sql.catalyst.plans.DescribeTableSchema
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, SupportsNamespaces, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, ColumnChange}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, StructType}
Expand Down Expand Up @@ -454,3 +454,31 @@ case class ShowTableProperties(
AttributeReference("key", StringType, nullable = false)(),
AttributeReference("value", StringType, nullable = false)())
}

/**
* The logical plan that defines or changes the comment of an NAMESPACE for v2 catalogs.
*
* {{{
* COMMENT ON (DATABASE|SCHEMA|NAMESPACE) namespaceIdentifier IS ('text' | NULL)
* }}}
*
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
*
*/
case class CommentOnNamespace(child: LogicalPlan, comment: String) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}

/**
* The logical plan that defines or changes the comment of an TABLE for v2 catalogs.
*
* {{{
* COMMENT ON TABLE tableIdentifier IS ('text' | NULL)
* }}}
*
* where the `text` is the new comment written as a string literal; or `NULL` to drop the comment.
*
*/
case class CommentOnTable(child: LogicalPlan, comment: String) extends Command {
override def children: Seq[LogicalPlan] = child :: Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedRelation, UnresolvedStar}
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -1983,4 +1982,22 @@ class DDLParserSuite extends AnalysisTest {
}
}
}

test("comment on") {
comparePlans(
parsePlan("COMMENT ON DATABASE a.b.c IS NULL"),
CommentOnNamespace(UnresolvedNamespace(Seq("a", "b", "c")), ""))

comparePlans(
parsePlan("COMMENT ON DATABASE a.b.c IS 'NULL'"),
CommentOnNamespace(UnresolvedNamespace(Seq("a", "b", "c")), "NULL"))

comparePlans(
parsePlan("COMMENT ON NAMESPACE a.b.c IS ''"),
CommentOnNamespace(UnresolvedNamespace(Seq("a", "b", "c")), ""))

comparePlans(
parsePlan("COMMENT ON TABLE a.b.c IS 'xYz'"),
CommentOnTable(UnresolvedTable(Seq("a", "b", "c")), "xYz"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._

import org.apache.spark.sql.{AnalysisException, Strategy}
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, TableCapability}
import org.apache.spark.sql.catalyst.plans.logical.{AlterNamespaceSetProperties, AlterTable, AppendData, CommentOnNamespace, CommentOnTable, CreateNamespace, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeNamespace, DescribeTable, DropNamespace, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, RefreshTable, RenameTable, Repartition, ReplaceTable, ReplaceTableAsSelect, SetCatalogAndNamespace, ShowCurrentNamespace, ShowNamespaces, ShowTableProperties, ShowTables}
import org.apache.spark.sql.connector.catalog.{Identifier, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
object DataSourceV2Strategy extends Strategy with PredicateHelper {

import DataSourceV2Implicits._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
Expand Down Expand Up @@ -210,6 +212,16 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper {
case AlterNamespaceSetProperties(catalog, namespace, properties) =>
AlterNamespaceSetPropertiesExec(catalog, namespace, properties) :: Nil

case CommentOnNamespace(ResolvedNamespace(catalog, namespace), comment) =>
AlterNamespaceSetPropertiesExec(
catalog,
namespace,
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil

case CommentOnTable(ResolvedTable(catalog, identifier, _), comment) =>
val changes = TableChange.setProperty(TableCatalog.PROP_COMMENT, comment)
AlterTableExec(catalog, identifier, Seq(changes)) :: Nil

case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)

val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
val comment = properties.get(TableCatalog.PROP_COMMENT)

try {
catalog.alterTable(catalogTable.copy(properties = properties, schema = schema))
catalog.alterTable(
catalogTable.copy(properties = properties, schema = schema, comment = comment))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
Expand Down
Loading