Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark - Implement FunctionCatalog and Truncate #5305

Closed

Conversation

kbendick
Copy link
Contributor

@kbendick kbendick commented Jul 19, 2022

Implements FunctionCatalog for Spark 3.3 and implements all variants of Truncate.

FunctionCatalog

This allows users of SparkCatalog and SparkSessionCatalog to use truncate without having to register it as a UDF.

All Iceberg functions that we register into the function catalog are accessible when used with an Iceberg spark catalog and:

  1. No namespace is referenced - the storage partitioned joins implementation requires this.
    e.g. my_catalog.truncate(width, value).
    Note - Using truncate(width, value) typically does not work, as Spark adds the namespace to the call. system.truncate should be preferred.
  2. The system namespace is referenced, to match called procedure syntax. Note this only works right now with the SparkCatalog, as the SparkSessionCatalog has logic in Spark to verify the namespace exists.
    e.g. my_catalog.system.truncate(width, value) or system.truncate(6, column)

Truncate

The truncate function also allows for a dynamic width or the width to come from a column - though typically the width will likely be static for one given call as it's mostly intended to be used to match partition transforms (specifically with joins or on non-partition columns to create a new column in the data without needing to partition on it).

This PR refactors the definition of the transform functions into a utility class where needed so that Spark’s magic functions can call them via the static invoke function and not duplicate logic. This allows Spark to include the functions in codegen.

Special Considerations for Using Function Catalog Efficiently via Magic Functions and Code Gen

The requirements for magic functions to be used with codegen include that:

  1. invoke is a static function
  2. invoke takes in the primitive types / native Spark types corresponding to each of Spark's input DataTypes (e.g. int for IntegerType and UTF8String for StringType).

Further documentation on the magic functions is found here in the ScalarFunction JavaDoc

This partially closes #5349

@github-actions github-actions bot added the spark label Jul 19, 2022
@kbendick
Copy link
Contributor Author

After this, I'll add bucket and zorder as well.

This is to facilitate the usage of the various transforms from PySpark as well as SQL.

Additionally, having a zorder function will make it possible for people to pre-sort their data on input, vs having to zorder sort it when running a data compaction job.

@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch 4 times, most recently from b6f983d to ac78639 Compare July 19, 2022 23:32
@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch from bd734c6 to fa5ad6e Compare July 20, 2022 19:36
@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch 3 times, most recently from 1e21a4b to bf3b2fd Compare July 26, 2022 16:57
@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch 5 times, most recently from 05ad451 to f81e780 Compare July 26, 2022 21:23
@kbendick
Copy link
Contributor Author

@rdblue PTAL. I was going to tag other people as well.

@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch from 2743a2a to 0f5c8f8 Compare July 26, 2022 21:28
@kbendick
Copy link
Contributor Author

@rdblue Do you think I should break this into a 2 PRs?

@kbendick
Copy link
Contributor Author

Looking at the storage partition joins, it looks like the function bucket will need to be resolvable on the empty namespace. I can update that here or a follow up:

https://github.com/apache/spark/blob/e9eb28e27d10497c8b36774609823f4bbd2c8500/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala#L84-L100

@aokolnychyi
Copy link
Contributor

I'd love to take a look as well. I should have some time in a day. We also had some progress on bucketed joins internally.

@kbendick
Copy link
Contributor Author

I'd love to take a look as well. I should have some time in a day. We also had some progress on bucketed joins internally.

Thanks Anton. Was going to tag you today now that it's cleaned up. Also cc @huaxingao @flyrain @nastra @Fokko

@kbendick
Copy link
Contributor Author

kbendick commented Jul 27, 2022

Right not we're requiring the call be to the system namespace, but the storage partitioned join implementation looks for a function called bucket in the FunctionCatalog using an empty array for the namespace per this diff in the merged PR in Spark for Storage Partitioned Joins inside V2ExpressionUtils#toCatalystTransform..

So we should probably allow the empty namespace to resolve functions as well.

@kbendick
Copy link
Contributor Author

kbendick commented Jul 27, 2022

Link to the code that resolves our own bucket implementation in case the link above doesn't resolve: https://github.com/apache/spark/blob/47f0303944abb11d3018186bc125113772eff8ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/V2ExpressionUtils.scala#L84-L100

@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch 5 times, most recently from 16626eb to ed2f738 Compare July 27, 2022 19:10
@kbendick kbendick force-pushed the kb-add-spark-function-catalog branch from ed2f738 to 24efcba Compare July 28, 2022 19:44
@kbendick
Copy link
Contributor Author

Because this PR is so big, I'm going to separate out the FunctionCatalog implementation from Truncate.

I'm going to add a very simple function to be able to test it but keep the code to review a lot smaller. 👍

@kbendick
Copy link
Contributor Author

I've opened #5377 to cover just the FunctionCatalog.

This PR is too big, and this way we can focus on just the FunctionCatalog business without having to worry about the details of truncate.

I've added an iceberg_version function in the other PR to assist with testing.

@kbendick
Copy link
Contributor Author

kbendick commented Aug 2, 2022

This PR is closed in favor of #5377 and #5411.

I'll open a PR for Truncate and link it shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Spark’s FunctionCatalog for Existing Transformations
3 participants