Skip to content

[SPARK-27661][SQL] Add SupportsNamespaces API #24560

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

/**
* NamespaceChange subclasses represent requested changes to a namespace. These are passed to
* {@link SupportsNamespaces#alterNamespace}. For example,
* <pre>
* import NamespaceChange._
* val catalog = Catalogs.load(name)
* catalog.alterNamespace(ident,
* setProperty("prop", "value"),
* removeProperty("other_prop")
* )
* </pre>
*/
public interface NamespaceChange {
/**
* Create a NamespaceChange for setting a namespace property.
* <p>
* If the property already exists, it will be replaced with the new value.
*
* @param property the property name
* @param value the new property value
* @return a NamespaceChange for the addition
*/
static NamespaceChange setProperty(String property, String value) {
return new SetProperty(property, value);
}

/**
* Create a NamespaceChange for removing a namespace property.
* <p>
* If the property does not exist, the change will succeed.
*
* @param property the property name
* @return a NamespaceChange for the addition
*/
static NamespaceChange removeProperty(String property) {
return new RemoveProperty(property);
}

/**
* A NamespaceChange to set a namespace property.
* <p>
* If the property already exists, it must be replaced with the new value.
*/
final class SetProperty implements NamespaceChange {
private final String property;
private final String value;

private SetProperty(String property, String value) {
this.property = property;
this.value = value;
}

public String property() {
return property;
}

public String value() {
return value;
}
}

/**
* A NamespaceChange to remove a namespace property.
* <p>
* If the property does not exist, the change should succeed.
*/
final class RemoveProperty implements NamespaceChange {
private final String property;

private RemoveProperty(String property) {
this.property = property;
}

public String property() {
return property;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;

import java.util.Map;

/**
* Catalog methods for working with namespaces.
* <p>
* If an object such as a table, view, or function exists, its parent namespaces must also exist
* and must be returned by the discovery methods {@link #listNamespaces()} and
* {@link #listNamespaces(String[])}.
* <p>
* Catalog implementations are not required to maintain the existence of namespaces independent of
* objects in a namespace. For example, a function catalog that loads functions using reflection
* and uses Java packages as namespaces is not required to support the methods to create, alter, or
* drop a namespace. Implementations are allowed to discover the existence of objects or namespaces
* without throwing {@link NoSuchNamespaceException} when no namespace is found.
*/
public interface SupportsNamespaces extends CatalogPlugin {

/**
* Return a default namespace for the catalog.
* <p>
* When this catalog is set as the current catalog, the namespace returned by this method will be
* set as the current namespace.
* <p>
* The namespace returned by this method is not required to exist.
*
* @return a multi-part namespace
*/
default String[] defaultNamespace() {
return new String[0];
}

/**
* List top-level namespaces from the catalog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this list the top-level namespaces or within the defaultNamespace? If it's also just going to return a, why does it need to return an array of arrays instead of just an array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should list top-level namespaces. The default namespace is the one used as Spark's current namespace when the catalog is the default and should not change the behavior of the catalog.

It returns an array of arrays so you can pass each result into listNamespaces(Array[String]). Array[String] is the type that we consistently use for a namespace, so I think it is correct to return it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding to my comment about listing top-level namespaces: if the default namespace changed the catalog's behavior, we would have to document exactly how it should change the behavior and have a way to configure it.

I like keeping this simple instead, so these methods do the same thing every time and Spark doesn't require the plugin to maintain a current namespace as state.

* <p>
* If an object such as a table, view, or function exists, its parent namespaces must also exist
* and must be returned by this discovery method. For example, if table a.b.t exists, this method
* must return ["a"] in the result array.
*
* @return an array of multi-part namespace names
*/
String[][] listNamespaces() throws NoSuchNamespaceException;

/**
* List namespaces in a namespace.
* <p>
* If an object such as a table, view, or function exists, its parent namespaces must also exist
* and must be returned by this discovery method. For example, if table a.b.t exists, this method
* invoked as listNamespaces(["a"]) must return ["a", "b"] in the result array.
*
* @param namespace a multi-part namespace
* @return an array of multi-part namespace names
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
*/
String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException;

/**
* Test whether a namespace exists.
* <p>
* If an object such as a table, view, or function exists, its parent namespaces must also exist.
* For example, if table a.b.t exists, this method invoked as namespaceExists(["a"]) or
* namespaceExists(["a", "b"]) must return true.
*
* @param namespace a multi-part namespace
* @return true if the namespace exists, false otherwise
*/
default boolean namespaceExists(String[] namespace) {
try {
loadNamespaceMetadata(namespace);
return true;
} catch (NoSuchNamespaceException e) {
return false;
}
}

/**
* Load metadata properties for a namespace.
*
* @param namespace a multi-part namespace
* @return a string map of properties for the given namespace
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
* @throws UnsupportedOperationException If namespace properties are not supported
*/
Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException;

/**
* Create a namespace in the catalog.
*
* @param namespace a multi-part namespace
* @param metadata a string map of properties for the given namespace
* @throws NamespaceAlreadyExistsException If the namespace already exists
* @throws UnsupportedOperationException If create is not a supported operation
*/
void createNamespace(
String[] namespace,
Map<String, String> metadata) throws NamespaceAlreadyExistsException;

/**
* Apply a set of metadata changes to a namespace in the catalog.
*
* @param namespace a multi-part namespace
* @param changes a collection of changes to apply to the namespace
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
* @throws UnsupportedOperationException If namespace properties are not supported
*/
void alterNamespace(
String[] namespace,
NamespaceChange... changes) throws NoSuchNamespaceException;

/**
* Drop a namespace from the catalog.
* <p>
* This operation may be rejected by the catalog implementation if the namespace is not empty by
* throwing {@link IllegalStateException}. If the catalog implementation does not support this
* operation, it may throw {@link UnsupportedOperationException}.
*
* @param namespace a multi-part namespace
* @return true if the namespace was dropped
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
* @throws IllegalStateException If the namespace is not empty
* @throws UnsupportedOperationException If drop is not a supported operation
*/
boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we not have a boolean flag here called cascade that deletes all the Namespaces and Tables under this namespace if it is not empty?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can consider adding it later. We should decide whether we want to require implementations to build that, or whether that should be done by Spark.

Copy link
Contributor

@brkyvz brkyvz Jul 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. Spark can list all tables within all namespaces and delete them if cascade = true, and not have the catalog do it. I can see the pros of that. I think something like that opens you up to race conditions too though. This is fine for now, just wanted to bring it up. If there's a real need (ask) we can always add it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm generally in favor of Spark taking care of operations like these. Otherwise it puts a lot of responsibility on catalog plugins and we want to avoid making those too complex. Required complexity in a plugin will lead to buggy implementations and unreliable behavior across plugins.

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, NamespaceChange, TableChange}
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.sources.v2.Table
Expand All @@ -31,6 +31,37 @@ import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
object CatalogV2Util {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._

/**
* Apply properties changes to a map and return the result.
*/
def applyNamespaceChanges(
properties: Map[String, String],
changes: Seq[NamespaceChange]): Map[String, String] = {
applyNamespaceChanges(properties.asJava, changes).asScala.toMap
}

/**
* Apply properties changes to a Java map and return the result.
*/
def applyNamespaceChanges(
properties: util.Map[String, String],
changes: Seq[NamespaceChange]): util.Map[String, String] = {
val newProperties = new util.HashMap[String, String](properties)

changes.foreach {
case set: NamespaceChange.SetProperty =>
newProperties.put(set.property, set.value)

case unset: NamespaceChange.RemoveProperty =>
newProperties.remove(unset.property)

case _ =>
// ignore non-property changes
}

Collections.unmodifiableMap(newProperties)
}

/**
* Apply properties changes to a map and return the result.
*/
Expand Down
Loading