-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26946][SQL] Identifiers for multi-catalog #23848
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
04e313c
13be86d
1566ead
3f65394
cc23994
8c35970
881d95a
b32bb1d
4993bcc
3bb4485
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
/** | ||
* Identifies an object in a catalog. | ||
*/ | ||
@Experimental | ||
public interface Identifier { | ||
|
||
static Identifier of(String[] namespace, String name) { | ||
return new IdentifierImpl(namespace, name); | ||
} | ||
|
||
/** | ||
* @return the namespace in the catalog | ||
*/ | ||
String[] namespace(); | ||
|
||
/** | ||
* @return the object name | ||
*/ | ||
String name(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
/** | ||
* An {@link Identifier} implementation. | ||
*/ | ||
@Experimental | ||
class IdentifierImpl implements Identifier { | ||
|
||
private String[] namespace; | ||
private String name; | ||
|
||
IdentifierImpl(String[] namespace, String name) { | ||
this.namespace = namespace; | ||
this.name = name; | ||
} | ||
|
||
@Override | ||
public String[] namespace() { | ||
return namespace; | ||
} | ||
|
||
@Override | ||
public String name() { | ||
return name; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2 | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.annotation.Experimental | ||
|
||
@Experimental | ||
class CatalogNotFoundException(message: String, cause: Throwable) | ||
extends SparkException(message, cause) { | ||
|
||
def this(message: String) = this(message, null) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2 | ||
|
||
import org.apache.spark.annotation.Experimental | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
|
||
/** | ||
* A trait to encapsulate catalog lookup function and helpful extractors. | ||
*/ | ||
@Experimental | ||
trait LookupCatalog { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why it's a trait? My understanding is this PR adds the class of the catalog object identifier and the related parser support. I don't think we have a detailed design of how analyzer looks up catalog yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This trait provides extractors, similar to a trait like This decouples the resolution rules from how the analyzer looks up catalogs and provides convenient extractors that implement those rules. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. then this should be an internal trait under a private package like |
||
|
||
def lookupCatalog: Option[(String) => CatalogPlugin] = None | ||
|
||
type CatalogObjectIdentifier = (Option[CatalogPlugin], Identifier) | ||
|
||
/** | ||
* Extract catalog plugin and identifier from a multi-part identifier. | ||
*/ | ||
object CatalogObjectIdentifier { | ||
def unapply(parts: Seq[String]): Option[CatalogObjectIdentifier] = lookupCatalog.map { lookup => | ||
parts match { | ||
case Seq(name) => | ||
(None, Identifier.of(Array.empty, name)) | ||
case Seq(catalogName, tail @ _*) => | ||
try { | ||
val catalog = lookup(catalogName) | ||
(Some(catalog), Identifier.of(tail.init.toArray, tail.last)) | ||
} catch { | ||
case _: CatalogNotFoundException => | ||
(None, Identifier.of(parts.init.toArray, parts.last)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Extract legacy table identifier from a multi-part identifier. | ||
* | ||
* For legacy support only. Please use | ||
* [[org.apache.spark.sql.catalog.v2.LookupCatalog.CatalogObjectIdentifier]] in DSv2 code paths. | ||
*/ | ||
object AsTableIdentifier { | ||
jzhuge marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { | ||
case CatalogObjectIdentifier(None, ident) => | ||
ident.namespace match { | ||
case Array() => | ||
Some(TableIdentifier(ident.name)) | ||
case Array(database) => | ||
Some(TableIdentifier(ident.name, Some(database))) | ||
case _ => | ||
None | ||
} | ||
case _ => | ||
None | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer | |
import scala.util.Random | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, LookupCatalog} | ||
import org.apache.spark.sql.catalyst._ | ||
import org.apache.spark.sql.catalyst.catalog._ | ||
import org.apache.spark.sql.catalyst.encoders.OuterScopes | ||
|
@@ -95,13 +96,19 @@ object AnalysisContext { | |
class Analyzer( | ||
catalog: SessionCatalog, | ||
conf: SQLConf, | ||
maxIterations: Int) | ||
extends RuleExecutor[LogicalPlan] with CheckAnalysis { | ||
maxIterations: Int, | ||
override val lookupCatalog: Option[(String) => CatalogPlugin] = None) | ||
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { | ||
|
||
def this(catalog: SessionCatalog, conf: SQLConf) = { | ||
this(catalog, conf, conf.optimizerMaxIterations) | ||
} | ||
|
||
def this(lookupCatalog: Option[(String) => CatalogPlugin], catalog: SessionCatalog, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. who will call this constructor? I feel we are adding too much code for future use only. Can we add them when they are needed? It will be good if this PR only add the identifier interface and impl class, and the related parser rules, which is pretty easy to justify. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan, I think this commit is reasonably self-contained. Nit-picking about whether a constructor is added in this commit or the next isn't adding much value. Keep in mind that we make commits self-contained to decrease conflicts and increase the rate at which we can review and accept patches. Is putting this in the next commit really worth the time it takes to change and test that change, if it means that this work is delayed another day? |
||
conf: SQLConf) = { | ||
this(catalog, conf, conf.optimizerMaxIterations, lookupCatalog) | ||
} | ||
|
||
def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { | ||
AnalysisHelper.markInAnalyzer { | ||
val analyzed = executeAndTrack(plan, tracker) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use a class directly? I don't see much value of using an interface here, as it has only one implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows us more flexibility than a single concrete class. Changing a class to an interface is not a binary compatible change, so using an interface is the right thing to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I suggest we move the impl class to a private package like
org.apache.spark.sql.catalyst
. Also the static method should be moved to the impl class as well, as we only create it inside Spark.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation class is package-private. If we were to move it to a different package, we would need to make it public for the
of
factory method, which would increase its visibility, not decrease it.