-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-24252][SQL] Add v2 catalog plugin system #23915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
import org.apache.spark.sql.internal.SQLConf; | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap; | ||
|
||
/** | ||
* A marker interface to provide a catalog implementation for Spark. | ||
* <p> | ||
* Implementations can provide catalog functions by implementing additional interfaces for tables, | ||
* views, and functions. | ||
* <p> | ||
* Catalog implementations must implement this marker interface to be loaded by | ||
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the | ||
* required public no-arg constructor. After creating an instance, it will be configured by calling | ||
* {@link #initialize(String, CaseInsensitiveStringMap)}. | ||
* <p> | ||
* Catalog implementations are registered to a name by adding a configuration option to Spark: | ||
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties | ||
* in the Spark configuration that share the catalog name prefix, | ||
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive | ||
* string map of options in initialization with the prefix removed. | ||
* {@code name}, is also passed and is the catalog's name; in this case, "catalog-name". | ||
*/ | ||
@Experimental | ||
public interface CatalogPlugin { | ||
/** | ||
* Called to initialize configuration. | ||
* <p> | ||
* This method is called once, just after the provider is instantiated. | ||
* | ||
* @param name the name used to identify and load this catalog | ||
* @param options a case-insensitive string map of configuration | ||
*/ | ||
void initialize(String name, CaseInsensitiveStringMap options); | ||
|
||
/** | ||
* Called to get this catalog's name. | ||
* <p> | ||
* This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is | ||
* called to pass the catalog's name. | ||
*/ | ||
String name(); | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.SparkException; | ||
import org.apache.spark.annotation.Private; | ||
import org.apache.spark.sql.internal.SQLConf; | ||
import org.apache.spark.sql.util.CaseInsensitiveStringMap; | ||
import org.apache.spark.util.Utils; | ||
|
||
import java.util.Map; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
import static scala.collection.JavaConverters.mapAsJavaMapConverter; | ||
|
||
@Private | ||
public class Catalogs { | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private Catalogs() { | ||
} | ||
|
||
/** | ||
* Load and configure a catalog by name. | ||
* <p> | ||
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache | ||
* or reuse instances. | ||
* | ||
* @param name a String catalog name | ||
* @param conf a SQLConf | ||
* @return an initialized CatalogPlugin | ||
* @throws SparkException If the plugin class cannot be found or instantiated | ||
*/ | ||
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException { | ||
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
rdblue marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this configuration allow for loading multiple catalogs with the same name but with different contexts? For example, say I want to load a different catalog plugin for functions vs. tables, but I want them to be named the same. My intuition is that we shouldn't allow that as it makes the behavior quite ambiguous. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is reasonable to go with what is here: a name has just one implementation class. That class can implement multiple catalog interfaces, which do not conflict. |
||
if (pluginClassName == null) { | ||
throw new SparkException(String.format( | ||
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name)); | ||
} | ||
|
||
ClassLoader loader = Utils.getContextOrSparkClassLoader(); | ||
|
||
try { | ||
Class<?> pluginClass = loader.loadClass(pluginClassName); | ||
|
||
if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) { | ||
throw new SparkException(String.format( | ||
"Plugin class for catalog '%s' does not implement CatalogPlugin: %s", | ||
name, pluginClassName)); | ||
} | ||
|
||
CatalogPlugin plugin = CatalogPlugin.class.cast(pluginClass.newInstance()); | ||
|
||
plugin.initialize(name, catalogOptions(name, conf)); | ||
|
||
return plugin; | ||
|
||
} catch (ClassNotFoundException e) { | ||
throw new SparkException(String.format( | ||
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not pass There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think it is useful in this case. The stack trace would be from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Then we should only wrap the I do think in general it's best practice to pass along the exception. Prevents us from losing any state, even if that state is noise 99.9% of the time. |
||
|
||
} catch (IllegalAccessException e) { | ||
throw new SparkException(String.format( | ||
"Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName), | ||
e); | ||
|
||
} catch (InstantiationException e) { | ||
throw new SparkException(String.format( | ||
"Failed while instantiating plugin for catalog '%s': %s", name, pluginClassName), | ||
e.getCause()); | ||
} | ||
} | ||
|
||
/** | ||
* Extracts a named catalog's configuration from a SQLConf. | ||
* | ||
* @param name a catalog name | ||
* @param conf a SQLConf | ||
* @return a case insensitive string map of options starting with spark.sql.catalog.(name). | ||
*/ | ||
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { | ||
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); | ||
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); | ||
|
||
CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); | ||
for (Map.Entry<String, String> entry : allConfs.entrySet()) { | ||
Matcher matcher = prefix.matcher(entry.getKey()); | ||
if (matcher.matches() && matcher.groupCount() > 0) { | ||
options.put(matcher.group(1), entry.getValue()); | ||
} | ||
} | ||
|
||
return options; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.util; | ||
|
||
import org.apache.spark.annotation.Experimental; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* Case-insensitive map of string keys to string values. | ||
* <p> | ||
* This is used to pass options to v2 implementations to ensure consistent case insensitivity. | ||
* <p> | ||
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return | ||
* keys converted to lower case. | ||
*/ | ||
@Experimental | ||
public class CaseInsensitiveStringMap implements Map<String, String> { | ||
|
||
public static CaseInsensitiveStringMap empty() { | ||
return new CaseInsensitiveStringMap(); | ||
} | ||
|
||
private final Map<String, String> delegate; | ||
|
||
private CaseInsensitiveStringMap() { | ||
this.delegate = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return delegate.size(); | ||
} | ||
|
||
@Override | ||
public boolean isEmpty() { | ||
return delegate.isEmpty(); | ||
} | ||
|
||
@Override | ||
public boolean containsKey(Object key) { | ||
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public boolean containsValue(Object value) { | ||
return delegate.containsValue(value); | ||
} | ||
|
||
@Override | ||
public String get(Object key) { | ||
return delegate.get(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public String put(String key, String value) { | ||
return delegate.put(key.toLowerCase(Locale.ROOT), value); | ||
} | ||
|
||
@Override | ||
public String remove(Object key) { | ||
return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public void putAll(Map<? extends String, ? extends String> m) { | ||
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) { | ||
put(entry.getKey(), entry.getValue()); | ||
} | ||
} | ||
|
||
@Override | ||
public void clear() { | ||
delegate.clear(); | ||
} | ||
|
||
@Override | ||
public Set<String> keySet() { | ||
return delegate.keySet(); | ||
} | ||
|
||
@Override | ||
public Collection<String> values() { | ||
return delegate.values(); | ||
} | ||
|
||
@Override | ||
public Set<Map.Entry<String, String>> entrySet() { | ||
return delegate.entrySet(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to ask the catalog plugin to report name and
initialize
requires a name.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall this be
void initialize(CaseInsensitiveStringMap options);
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catalog is passed the name that was used to identify it. For example, say I have a REST-based catalog endpoint that I'm configuring in two cases, like this:
MyCatalogPlugin
is instantiated and configured twice and both times is passed the name it is configured with,prod
andtest
.Adding a getter for
name
just makes it easy to identify the catalog without Spark keeping track of name -> catalog instance everywhere.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, how would the
MyCatalogPlugin
report its name?prod
ortest
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
^ depends on which catalog instance. one would say
prod
the other would saytest