Skip to content

Commit 923b829

Browse files
jzhugerdblue
authored andcommitted
[SPARK-26946][SQL] Identifiers for multi-catalog
- Support N-part identifier in SQL - N-part identifier extractor in Analyzer - A new unit test suite ResolveMultipartRelationSuite - CatalogLoadingSuite rblue cloud-fan mccheah Closes apache#23848 from jzhuge/SPARK-26946. Authored-by: John Zhuge <jzhuge@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent c0a2bdd commit 923b829

File tree

13 files changed

+339
-6
lines changed

13 files changed

+339
-6
lines changed

sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ singleTableIdentifier
5252
: tableIdentifier EOF
5353
;
5454

55+
singleMultipartIdentifier
56+
: multipartIdentifier EOF
57+
;
58+
5559
singleFunctionIdentifier
5660
: functionIdentifier EOF
5761
;
@@ -523,6 +527,10 @@ rowFormat
523527
(NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited
524528
;
525529

530+
multipartIdentifier
531+
: parts+=identifier ('.' parts+=identifier)*
532+
;
533+
526534
tableIdentifier
527535
: (db=identifier '.')? table=identifier
528536
;

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,14 @@ private Catalogs() {
4040
* @param name a String catalog name
4141
* @param conf a SQLConf
4242
* @return an initialized CatalogProvider
43-
* @throws SparkException If the provider class cannot be found or instantiated
43+
* @throws CatalogNotFoundException if the plugin class cannot be found
44+
* @throws SparkException if the plugin class cannot be instantiated
4445
*/
4546
public static CatalogProvider load(String name, SQLConf conf) throws SparkException {
4647
String providerClassName = conf.getConfString("spark.sql.catalog." + name, null);
4748
if (providerClassName == null) {
48-
throw new SparkException(String.format(
49-
"Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name));
49+
throw new CatalogNotFoundException(String.format(
50+
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
5051
}
5152

5253
ClassLoader loader = Utils.getContextOrSparkClassLoader();
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* Identifies an object in a catalog.
24+
*/
25+
@Experimental
26+
public interface Identifier {
27+
28+
static Identifier of(String[] namespace, String name) {
29+
return new IdentifierImpl(namespace, name);
30+
}
31+
32+
/**
33+
* @return the namespace in the catalog
34+
*/
35+
String[] namespace();
36+
37+
/**
38+
* @return the object name
39+
*/
40+
String name();
41+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
22+
/**
23+
* An {@link Identifier} implementation.
24+
*/
25+
@Experimental
26+
class IdentifierImpl implements Identifier {
27+
28+
private String[] namespace;
29+
private String name;
30+
31+
IdentifierImpl(String[] namespace, String name) {
32+
this.namespace = namespace;
33+
this.name = name;
34+
}
35+
36+
@Override
37+
public String[] namespace() {
38+
return namespace;
39+
}
40+
41+
@Override
42+
public String name() {
43+
return name;
44+
}
45+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.annotation.Experimental
22+
23+
@Experimental
24+
class CatalogNotFoundException(message: String, cause: Throwable)
25+
extends SparkException(message, cause) {
26+
27+
def this(message: String) = this(message, null)
28+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalog.v2
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql.catalyst.TableIdentifier
22+
23+
/**
24+
* A trait to encapsulate catalog lookup function and helpful extractors.
25+
*/
26+
@Experimental
27+
trait LookupCatalog {
28+
29+
def lookupCatalog: Option[(String) => CatalogPlugin] = None
30+
31+
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)
32+
33+
/**
34+
* Extract catalog plugin and identifier from a multi-part identifier.
35+
*/
36+
object CatalogObjectIdentifier {
37+
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup =>
38+
parts match {
39+
case Seq(name) =>
40+
(None, Identifier.of(Array.empty, name))
41+
case Seq(catalogName, tail @ _*) =>
42+
try {
43+
val catalog = lookup(catalogName)
44+
(Some(catalog), Identifier.of(tail.init.toArray, tail.last))
45+
} catch {
46+
case _: CatalogNotFoundException =>
47+
(None, Identifier.of(parts.init.toArray, parts.last))
48+
}
49+
}
50+
}
51+
}
52+
53+
/**
54+
* Extract legacy table identifier from a multi-part identifier.
55+
*
56+
* For legacy support only. Please use
57+
* [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths.
58+
*/
59+
object AsTableIdentifier {
60+
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
61+
case CatalogObjectIdentifier(None, ident) =>
62+
ident.namespace match {
63+
case Array() =>
64+
Some(TableIdentifier(ident.name))
65+
case Array(database) =>
66+
Some(TableIdentifier(ident.name, Some(database)))
67+
case _ =>
68+
None
69+
}
70+
case _ =>
71+
None
72+
}
73+
}
74+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
2222
import scala.util.Random
2323

2424
import org.apache.spark.sql.AnalysisException
25+
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog}
2526
import org.apache.spark.sql.catalyst.{ScalaReflection, TableIdentifier}
2627
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogRelation, InMemoryCatalog, MaybeCatalogRelation, SessionCatalog}
2728
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -93,13 +94,19 @@ object AnalysisContext {
9394
class Analyzer(
9495
catalog: SessionCatalog,
9596
conf: SQLConf,
96-
maxIterations: Int)
97-
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
97+
maxIterations: Int,
98+
override val lookupCatalog: Option[(String) => CatalogPlugin] = None)
99+
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
98100

99101
def this(catalog: SessionCatalog, conf: SQLConf) = {
100102
this(catalog, conf, conf.optimizerMaxIterations)
101103
}
102104

105+
def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog,
106+
conf: SQLConf) = {
107+
this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog)
108+
}
109+
103110
def executeAndCheck(plan: LogicalPlan): LogicalPlan = {
104111
val analyzed = execute(plan)
105112
try {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
8787
visitFunctionIdentifier(ctx.functionIdentifier)
8888
}
8989

90+
override def visitSingleMultipartIdentifier(
91+
ctx: SingleMultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
92+
visitMultipartIdentifier(ctx.multipartIdentifier)
93+
}
94+
9095
override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) {
9196
visitSparkDataType(ctx.dataType)
9297
}
@@ -909,6 +914,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
909914
FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText))
910915
}
911916

917+
/**
918+
* Create a multi-part identifier.
919+
*/
920+
override def visitMultipartIdentifier(
921+
ctx: MultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
922+
ctx.parts.asScala.map(_.getText)
923+
}
924+
912925
/* ********************************************************************************************
913926
* Expression parsing
914927
* ******************************************************************************************** */

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
5757
}
5858
}
5959

60+
/** Creates a multi-part identifier for a given SQL string */
61+
override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
62+
parse(sqlText) { parser =>
63+
astBuilder.visitSingleMultipartIdentifier(parser.singleMultipartIdentifier())
64+
}
65+
}
66+
6067
/**
6168
* Creates StructType for a given SQL string, which is a comma separated list of field
6269
* definitions which will preserve the correct Hive metadata.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ trait ParserInterface {
5252
@throws[ParseException]("Text cannot be parsed to a FunctionIdentifier")
5353
def parseFunctionIdentifier(sqlText: String): FunctionIdentifier
5454

55+
/**
56+
* Parse a string to a multi-part identifier.
57+
*/
58+
@throws[ParseException]("Text cannot be parsed to a multi-part identifier")
59+
def parseMultipartIdentifier(sqlText: String): Seq[String]
60+
5561
/**
5662
* Parse a string to a [[StructType]]. The passed SQL string should be a comma separated list
5763
* of field definitions which will preserve the correct Hive metadata.

0 commit comments

Comments
 (0)