Skip to content

[SPARK-26946][SQL] Identifiers for multi-catalog #23848

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ singleTableIdentifier
: tableIdentifier EOF
;

singleMultipartIdentifier
: multipartIdentifier EOF
;

singleFunctionIdentifier
: functionIdentifier EOF
;
Expand Down Expand Up @@ -554,6 +558,10 @@ rowFormat
(NULL DEFINED AS nullDefinedAs=STRING)? #rowFormatDelimited
;

multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;

tableIdentifier
: (db=identifier '.')? table=identifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ private Catalogs() {
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogPlugin
* @throws SparkException If the plugin class cannot be found or instantiated
* @throws CatalogNotFoundException if the plugin class cannot be found
* @throws SparkException if the plugin class cannot be instantiated
*/
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException {
public static CatalogPlugin load(String name, SQLConf conf)
throws CatalogNotFoundException, SparkException {
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
if (pluginClassName == null) {
throw new SparkException(String.format(
throw new CatalogNotFoundException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.annotation.Experimental;

/**
* Identifies an object in a catalog.
*/
@Experimental
public interface Identifier {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use a class directly? I don't see much value of using an interface here, as it has only one implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows us more flexibility than a single concrete class. Changing a class to an interface is not a binary compatible change, so using an interface is the right thing to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I suggest we move the impl class to a private package like org.apache.spark.sql.catalyst. Also the static method should be moved to the impl class as well, as we only create it inside Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation class is package-private. If we were to move it to a different package, we would need to make it public for the of factory method, which would increase its visibility, not decrease it.


static Identifier of(String[] namespace, String name) {
return new IdentifierImpl(namespace, name);
}

/**
* @return the namespace in the catalog
*/
String[] namespace();

/**
* @return the object name
*/
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.annotation.Experimental;

/**
* An {@link Identifier} implementation.
*/
@Experimental
class IdentifierImpl implements Identifier {

private String[] namespace;
private String name;

IdentifierImpl(String[] namespace, String name) {
this.namespace = namespace;
this.name = name;
}

@Override
public String[] namespace() {
return namespace;
}

@Override
public String name() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2

import org.apache.spark.SparkException
import org.apache.spark.annotation.Experimental

@Experimental
class CatalogNotFoundException(message: String, cause: Throwable)
extends SparkException(message, cause) {

def this(message: String) = this(message, null)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.TableIdentifier

/**
* A trait to encapsulate catalog lookup function and helpful extractors.
*/
@Experimental
trait LookupCatalog {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's a trait?

My understanding is this PR adds the class of the catalog object identifier and the related parser support. I don't think we have a detailed design of how analyzer looks up catalog yet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait provides extractors, similar to a trait like PredicateHelper. These implement the resolution rules from the SPIP using a generic catalog lookup provided by the implementation.

This decouples the resolution rules from how the analyzer looks up catalogs and provides convenient extractors that implement those rules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then this should be an internal trait under a private package like org.apache.spark.sql.catalyst


def lookupCatalog: Option[(String) => CatalogPlugin] = None

type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier)

/**
* Extract catalog plugin and identifier from a multi-part identifier.
*/
object CatalogObjectIdentifier {
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup =>
parts match {
case Seq(name) =>
(None, Identifier.of(Array.empty, name))
case Seq(catalogName, tail @ _*) =>
try {
val catalog = lookup(catalogName)
(Some(catalog), Identifier.of(tail.init.toArray, tail.last))
} catch {
case _: CatalogNotFoundException =>
(None, Identifier.of(parts.init.toArray, parts.last))
}
}
}
}

/**
* Extract legacy table identifier from a multi-part identifier.
*
* For legacy support only. Please use
* [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths.
*/
object AsTableIdentifier {
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
case CatalogObjectIdentifier(None, ident) =>
ident.namespace match {
case Array() =>
Some(TableIdentifier(ident.name))
case Array(database) =>
Some(TableIdentifier(ident.name, Some(database)))
case _ =>
None
}
case _ =>
None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog}
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
Expand Down Expand Up @@ -95,13 +96,19 @@ object AnalysisContext {
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
maxIterations: Int,
override val lookupCatalog: Option[(String) => CatalogPlugin] = None)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {

def this(catalog: SessionCatalog, conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
}

def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who will call this constructor? I feel we are adding too much code for future use only. Can we add them when they are needed? It will be good if this PR only add the identifier interface and impl class, and the related parser rules, which is pretty easy to justify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, I think this commit is reasonably self-contained. Nit-picking about whether a constructor is added in this commit or the next isn't adding much value.

Keep in mind that we make commits self-contained to decrease conflicts and increase the rate at which we can review and accept patches. Is putting this in the next commit really worth the time it takes to change and test that change, if it means that this work is delayed another day?

conf: SQLConf) = {
this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog)
}

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
AnalysisHelper.markInAnalyzer {
val analyzed = executeAndTrack(plan, tracker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
visitFunctionIdentifier(ctx.functionIdentifier)
}

override def visitSingleMultipartIdentifier(
ctx: SingleMultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
visitMultipartIdentifier(ctx.multipartIdentifier)
}

override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) {
visitSparkDataType(ctx.dataType)
}
Expand Down Expand Up @@ -957,6 +962,14 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText))
}

/**
* Create a multi-part identifier.
*/
override def visitMultipartIdentifier(
ctx: MultipartIdentifierContext): Seq[String] = withOrigin(ctx) {
ctx.parts.asScala.map(_.getText)
}

/* ********************************************************************************************
* Expression parsing
* ******************************************************************************************** */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ abstract class AbstractSqlParser extends ParserInterface with Logging {
}
}

/** Creates a multi-part identifier for a given SQL string */
override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
parse(sqlText) { parser =>
astBuilder.visitSingleMultipartIdentifier(parser.singleMultipartIdentifier())
}
}

/**
* Creates StructType for a given SQL string, which is a comma separated list of field
* definitions which will preserve the correct Hive metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ trait ParserInterface {
@throws[ParseException]("Text cannot be parsed to a FunctionIdentifier")
def parseFunctionIdentifier(sqlText: String): FunctionIdentifier

/**
* Parse a string to a multi-part identifier.
*/
@throws[ParseException]("Text cannot be parsed to a multi-part identifier")
def parseMultipartIdentifier(sqlText: String): Seq[String]

/**
* Parse a string to a [[StructType]]. The passed SQL string should be a comma separated list
* of field definitions which will preserve the correct Hive metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public void testInitializationOptions() throws SparkException {
public void testLoadWithoutConfig() {
SQLConf conf = new SQLConf();

SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf));
SparkException exc = intercept(CatalogNotFoundException.class,
() -> Catalogs.load("missing", conf));

Assert.assertTrue("Should complain that implementation is not configured",
exc.getMessage()
Expand Down
Loading