-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark - Add Spark FunctionCatalog #5377
Spark - Add Spark FunctionCatalog #5377
Conversation
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestIcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/SparkFunctions.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunctionImpl.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunctionImpl.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunctionImpl.java
Outdated
Show resolved
Hide resolved
.../v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunctionImpl.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
692c07e
to
257163d
Compare
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Show resolved
Hide resolved
ce113f4
to
e1054c2
Compare
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I had a few minor suggestions. Thanks for working on this, @kbendick!
086ef01
to
2e9651a
Compare
Thanks for the review @aokolnychyi! I made those changes and rebased. This will hopefully help unblock parallelizing work on storage partitioned joins and things. |
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/functions/IcebergVersionFunction.java
Outdated
Show resolved
Hide resolved
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
.anyMatch(func -> "iceberg_version".equalsIgnoreCase(func.name())); | ||
|
||
Assertions.assertThat(asFunctionCatalog.listFunctions(SYSTEM_NAMESPACE)) | ||
.anyMatch(func -> "iceberg_version".equalsIgnoreCase(func.name())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would not be permissive here. This lets Spark change the case of the function name, which we don't expect and would be really odd.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I chose this as function resolution is case-insensitive in Spark, at least according to the existing code for Procedure
's as well as my own investigation. This way, it also matches our current ProcedureCatalog
callables. I can update it though.
iceberg/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
Lines 34 to 37 in 14f4bc1
public static ProcedureBuilder newBuilder(String name) { | |
// procedure resolution is case insensitive to match the existing Spark behavior for functions | |
Supplier<ProcedureBuilder> builderSupplier = BUILDERS.get(name.toLowerCase(Locale.ROOT)); | |
return builderSupplier != null ? builderSupplier.get() : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the tests, but built-in functions are case-insensitive by default.
Tests done on Spark 3.3 with spark.sql.caseSensitive
set to both true
and false
(made no difference either way).
scala> spark.sql("SELECT uuid() as _uuid").show()
+--------------------+
| _uuid|
+--------------------+
|3d52c2c7-225c-44c...|
+--------------------+
scala> spark.sql("SELECT UUID() as _uuid").show()
+--------------------+
| _uuid|
+--------------------+
|1babdfb6-1f71-498...|
+--------------------+
scala> spark.sql("SELECT UuID() as _uuid").show()
+--------------------+
| _uuid|
+--------------------+
|d63892a1-e2e6-49d...|
+--------------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think Spark functions are case insensitive no matter what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the tests, but built-in functions are case-insensitive by default.
That doesn't mean that Spark will change the case of names that are supplied by FunctionCatalog
. The problem isn't resolution being case insensitive, it is that this test case allows Spark to change the function name. I don't see a reason to allow that.
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
Outdated
Show resolved
Hide resolved
bdce642
to
cdfbbc6
Compare
cdfbbc6
to
9369e78
Compare
@rdblue I addressed your most recent comments. Can you please take a look? |
Merged. @kbendick, can you backport this to 3.2 as well? |
Working on it now! |
New PR for Spark 3.2: #5411 I'll ping people on it once unit tests have finished passing. |
This PR stems from #5305 and covers just the
FunctionCatalog
FunctionCatalog
This allows users of
SparkCatalog
andSparkSessionCatalog
to use functions (such as theiceberg_version
function added here) without having to register it as a UDF.The v2 functions also benefit from code generation and are significantly more efficient. For instance, this registers a logical plan of
project "0.15.0-SNAPSHOT" as value
).All Iceberg functions that we register into the function catalog are accessible when used with an Iceberg spark catalog and:
e.g.
my_catalog.iceberg_version()
.system
namespace is referenced, to match called procedure syntax.** Note ** The session catalog,
SparkSessionCatalog
or normally namedspark_catalog
, requires that the namespace being referenced exists.For the session_catalog, the namespace (the default if none is being used) will be referenced, which will not resolve. To work around this when using the session_catalog in SQL, two options are either:
spark_catalog.iceberg_version()
system.iceberg_version()
. This requires creating asystem
namespace in the session catalog, but this is the most portable solution for SQL code.Logic in Spark in Spark Analyzer to verify that the namespace exists when the function is resolved, only for the session catalog.
iceberg_version function
This also adds a simple function
iceberg_version
, which simply returns the (short) version string. This is mostly for testing but will be useful on its own.