Skip to content

[SPARK-24252][SQL] Add v2 catalog plugin system #23915

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A marker interface to provide a catalog implementation for Spark.
* <p>
* Implementations can provide catalog functions by implementing additional interfaces for tables,
* views, and functions.
* <p>
* Catalog implementations must implement this marker interface to be loaded by
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
* required public no-arg constructor. After creating an instance, it will be configured by calling
* {@link #initialize(String, CaseInsensitiveStringMap)}.
* <p>
* Catalog implementations are registered to a name by adding a configuration option to Spark:
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
* in the Spark configuration that share the catalog name prefix,
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
* string map of options in initialization with the prefix removed.
* {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".
*/
@Experimental
public interface CatalogPlugin {
/**
* Called to initialize configuration.
* <p>
* This method is called once, just after the provider is instantiated.
*
* @param name the name used to identify and load this catalog
* @param options a case-insensitive string map of configuration
*/
void initialize(String name, CaseInsensitiveStringMap options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's weird to ask the catalog plugin to report name and initialize requires a name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall this be void initialize(CaseInsensitiveStringMap options);?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catalog is passed the name that was used to identify it. For example, say I have a REST-based catalog endpoint that I'm configuring in two cases, like this:

spark.sql.catalog.prod = com.example.MyCatalogPlugin
spark.sql.catalog.prod.connuri = http://prod.catalog.example.com:80/
spark.sql.catalog.test = com.example.MyCatalogPlugin
spark.sql.catalog.test.connuri = http://test.catalog.example.com:80/

MyCatalogPlugin is instantiated and configured twice and both times is passed the name it is configured with, prod and test.

Adding a getter for name just makes it easy to identify the catalog without Spark keeping track of name -> catalog instance everywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, how would the MyCatalogPlugin report its name? prod or test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^ depends on which catalog instance. one would say prod the other would say test


/**
* Called to get this catalog's name.
* <p>
* This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is
* called to pass the catalog's name.
*/
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.annotation.Private;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

@Private
public class Catalogs {
private Catalogs() {
}

/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache
* or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogPlugin
* @throws SparkException If the plugin class cannot be found or instantiated
*/
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException {
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this configuration allow for loading multiple catalogs with the same name but with different contexts? For example, say I want to load a different catalog plugin for functions vs. tables, but I want them to be named the same.

My intuition is that we shouldn't allow that as it makes the behavior quite ambiguous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is reasonable to go with what is here: a name has just one implementation class. That class can implement multiple catalog interfaces, which do not conflict.

if (pluginClassName == null) {
throw new SparkException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();

try {
Class<?> pluginClass = loader.loadClass(pluginClassName);

if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
throw new SparkException(String.format(
"Plugin class for catalog '%s' does not implement CatalogPlugin: %s",
name, pluginClassName));
}

CatalogPlugin plugin = CatalogPlugin.class.cast(pluginClass.newInstance());

plugin.initialize(name, catalogOptions(name, conf));

return plugin;

} catch (ClassNotFoundException e) {
throw new SparkException(String.format(
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not pass e for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is useful in this case. The stack trace would be from Class.forName into the Java runtime, and all of the information from the message, like the class name, is included in this one. The stack above this call is also included in the thrown exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Then we should only wrap the Class.forName call in the try-catch - if anything else in the block throws ClassNotFoundException it will not be obvious where it was thrown from. And while ClassNotFoundException can't be thrown by any other code currently, future contributors adding code in this block can get their exceptions swallowed up.

I do think in general it's best practice to pass along the exception. Prevents us from losing any state, even if that state is noise 99.9% of the time.


} catch (IllegalAccessException e) {
throw new SparkException(String.format(
"Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName),
e);

} catch (InstantiationException e) {
throw new SparkException(String.format(
"Failed while instantiating plugin for catalog '%s': %s", name, pluginClassName),
e.getCause());
}
}

/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util;

import org.apache.spark.annotation.Experimental;

import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Case-insensitive map of string keys to string values.
* <p>
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}

@Override
public void clear() {
delegate.clear();
}

@Override
public Set<String> keySet() {
return delegate.keySet();
}

@Override
public Collection<String> values() {
return delegate.values();
}

@Override
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}
}
Loading