Skip to content

Commit

Permalink
Spark 3.2: Backport FunctionCatalog to Spark 3.2 (apache#5411)
Browse files Browse the repository at this point in the history
  • Loading branch information
kbendick authored Aug 1, 2022
1 parent e05f2bb commit 8fb88a7
Show file tree
Hide file tree
Showing 4 changed files with 333 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.spark.functions.SparkFunctions;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;

abstract class BaseCatalog
implements StagingTableCatalog, ProcedureCatalog, SupportsNamespaces, HasIcebergCatalog {
implements StagingTableCatalog,
ProcedureCatalog,
SupportsNamespaces,
HasIcebergCatalog,
FunctionCatalog {

@Override
public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
Expand All @@ -38,7 +47,7 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException

// namespace resolution is case insensitive until we have a way to configure case sensitivity in
// catalogs
if (namespace.length == 1 && namespace[0].equalsIgnoreCase("system")) {
if (isSystemNamespace(namespace)) {
ProcedureBuilder builder = SparkProcedures.newBuilder(name);
if (builder != null) {
return builder.withTableCatalog(this).build();
Expand All @@ -47,4 +56,40 @@ public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException

throw new NoSuchProcedureException(ident);
}

@Override
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
if (namespace.length == 0 || isSystemNamespace(namespace)) {
return SparkFunctions.list().stream()
.map(name -> Identifier.of(namespace, name))
.toArray(Identifier[]::new);
} else if (namespaceExists(namespace)) {
return new Identifier[0];
}

throw new NoSuchNamespaceException(namespace);
}

@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
String[] namespace = ident.namespace();
String name = ident.name();

// Allow for empty namespace, as Spark's storage partitioned joins look up
// the corresponding functions to generate transforms for partitioning
// with an empty namespace, such as `bucket`.
// Otherwise, use `system` namespace.
if (namespace.length == 0 || isSystemNamespace(namespace)) {
UnboundFunction func = SparkFunctions.load(name);
if (func != null) {
return func;
}
}

throw new NoSuchFunctionException(ident);
}

private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.functions;

import org.apache.iceberg.IcebergBuild;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.functions.BoundFunction;
import org.apache.spark.sql.connector.catalog.functions.ScalarFunction;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

/**
* A function for use in SQL that returns the current Iceberg version, e.g. {@code SELECT
* system.iceberg_version()} will return a String such as "0.14.0" or "0.15.0-SNAPSHOT"
*/
public class IcebergVersionFunction implements UnboundFunction {
@Override
public BoundFunction bind(StructType inputType) {
if (inputType.fields().length > 0) {
throw new UnsupportedOperationException(
String.format("Cannot bind: %s does not accept arguments", name()));
}

return new IcebergVersionFunctionImpl();
}

@Override
public String description() {
return name() + " - Returns the runtime Iceberg version";
}

@Override
public String name() {
return "iceberg_version";
}

// Implementing class cannot be private, otherwise Spark is unable to access the static invoke
// function during code-gen and calling the function fails
static class IcebergVersionFunctionImpl implements ScalarFunction<UTF8String> {
private static final UTF8String VERSION = UTF8String.fromString(IcebergBuild.version());

// magic function used in code-gen. must be named `invoke`.
public static UTF8String invoke() {
return VERSION;
}

@Override
public DataType[] inputTypes() {
return new DataType[0];
}

@Override
public DataType resultType() {
return DataTypes.StringType;
}

@Override
public boolean isResultNullable() {
return false;
}

@Override
public String canonicalName() {
return "iceberg." + name();
}

@Override
public String name() {
return "iceberg_version";
}

@Override
public UTF8String produceResult(InternalRow input) {
return invoke();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.functions;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;

public class SparkFunctions {

private SparkFunctions() {}

private static final Map<String, UnboundFunction> FUNCTIONS =
ImmutableMap.of("iceberg_version", new IcebergVersionFunction());

private static final List<String> FUNCTION_NAMES = ImmutableList.copyOf(FUNCTIONS.keySet());

// Functions that are added to all Iceberg catalogs should be accessed with the `system`
// namespace. They can also be accessed with no namespace at all if qualified with the
// catalog name, e.g. my_hadoop_catalog.iceberg_version().
// As namespace resolution is handled by those rules in BaseCatalog, a list of names
// alone is returned.
public static List<String> list() {
return FUNCTION_NAMES;
}

public static UnboundFunction load(String name) {
// function resolution is case-insensitive to match the existing Spark behavior for functions
return FUNCTIONS.get(name.toLowerCase(Locale.ROOT));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.functions.IcebergVersionFunction;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestFunctionCatalog extends SparkTestBaseWithCatalog {
private static final String[] EMPTY_NAMESPACE = new String[] {};
private static final String[] SYSTEM_NAMESPACE = new String[] {"system"};
private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
private static final String[] DB_NAMESPACE = new String[] {"db"};
private final FunctionCatalog asFunctionCatalog;

public TestFunctionCatalog() {
this.asFunctionCatalog = castToFunctionCatalog(catalogName);
}

@Before
public void createDefaultNamespace() {
sql("CREATE NAMESPACE IF NOT EXISTS %s", catalogName + ".default");
}

@After
public void dropDefaultNamespace() {
sql("DROP NAMESPACE IF EXISTS %s", catalogName + ".default");
}

@Test
public void testListFunctionsViaCatalog() throws NoSuchNamespaceException {
Assertions.assertThat(asFunctionCatalog.listFunctions(EMPTY_NAMESPACE))
.anyMatch(func -> "iceberg_version".equals(func.name()));

Assertions.assertThat(asFunctionCatalog.listFunctions(SYSTEM_NAMESPACE))
.anyMatch(func -> "iceberg_version".equals(func.name()));

Assert.assertArrayEquals(
"Listing functions in an existing namespace that's not system should not throw",
new Identifier[0],
asFunctionCatalog.listFunctions(DEFAULT_NAMESPACE));

AssertHelpers.assertThrows(
"Listing functions in a namespace that does not exist should throw",
NoSuchNamespaceException.class,
"Namespace 'db' not found",
() -> asFunctionCatalog.listFunctions(DB_NAMESPACE));
}

@Test
public void testLoadFunctions() throws NoSuchFunctionException {
for (String[] namespace : ImmutableList.of(EMPTY_NAMESPACE, SYSTEM_NAMESPACE)) {
Identifier identifier = Identifier.of(namespace, "iceberg_version");
UnboundFunction func = asFunctionCatalog.loadFunction(identifier);

Assertions.assertThat(func)
.isNotNull()
.isInstanceOf(UnboundFunction.class)
.isExactlyInstanceOf(IcebergVersionFunction.class);
}

AssertHelpers.assertThrows(
"Cannot load a function if it's not used with the system namespace or the empty namespace",
NoSuchFunctionException.class,
"Undefined function: default.iceberg_version",
() -> asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE, "iceberg_version")));

Identifier undefinedFunction = Identifier.of(SYSTEM_NAMESPACE, "undefined_function");
AssertHelpers.assertThrows(
"Cannot load a function that does not exist",
NoSuchFunctionException.class,
"Undefined function: system.undefined_function",
() -> asFunctionCatalog.loadFunction(undefinedFunction));

AssertHelpers.assertThrows(
"Using an undefined function from SQL should fail analysis",
AnalysisException.class,
"Undefined function",
() -> sql("SELECT undefined_function(1, 2)"));
}

@Test
public void testCallingFunctionInSQLEndToEnd() {
String buildVersion = IcebergBuild.version();

Assert.assertEquals(
"Should be able to use the Iceberg version function from the fully qualified system namespace",
buildVersion,
scalarSql("SELECT %s.system.iceberg_version()", catalogName));

Assert.assertEquals(
"Should be able to use the Iceberg version function when fully qualified without specifying a namespace",
buildVersion,
scalarSql("SELECT %s.iceberg_version()", catalogName));

sql("USE %s", catalogName);

Assert.assertEquals(
"Should be able to call iceberg_version from system namespace without fully qualified name when using Iceberg catalog",
buildVersion,
scalarSql("SELECT system.iceberg_version()"));

Assert.assertEquals(
"Should be able to call iceberg_version from empty namespace without fully qualified name when using Iceberg catalog",
buildVersion,
scalarSql("SELECT iceberg_version()"));
}

private FunctionCatalog castToFunctionCatalog(String name) {
return (FunctionCatalog) spark.sessionState().catalogManager().catalog(name);
}
}

0 comments on commit 8fb88a7

Please sign in to comment.