-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: add sql partition functions #5120
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wuwenchi!
I left some initial feedback based on my first pass through. Im still digesting this but had some questions and some feedback. 👍
} | ||
|
||
@Override | ||
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { | ||
throw new FunctionNotExistException(getName(), functionPath); | ||
CatalogFunction catalogFunction = partitionFunctions.get(functionPath.getObjectName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any situations where this might get used for other reasons than just partition UDFs? Like accessing some other UDF that the user might have registered?
From the existing code, it looks fine. But for my own understanding, I’m still wondering if it’s possible for a user to register their own UDF that would be associated with this Catalog, and if so are we currently making it not possible to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any situations where this might get used for other reasons than just partition UDFs? Like accessing some other UDF that the user might have registered?
Here, we mainly use these udfs to create, modify, and delete partitionSpec through flink.
From the existing code, it looks fine. But for my own understanding, I’m still wondering if it’s possible for a user to register their own UDF that would be associated with this Catalog, and if so are we currently making it not possible to do that?
Currently, it is not supported for users to register UDF under the flink catalog. But in the next step (just next PR), I will add this feature.
But after we support users to add UDFs, I still want to add these two functions, so that users do not need to manually add them again when they want to query related partitions, I would like to provide them by default. What do you think about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But after we support users to add UDFs, I still want to add these two functions, so that users do not need to manually add them again when they want to query related partitions, I would like to provide them by default. What do you think about this?
I agree we should register them on behalf of the users. If users will also be able to register their own functions on the catalog, we might want to pick slightly more specific names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have found the answers for my deleted questions 😄
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java
Outdated
Show resolved
Hide resolved
public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) { | ||
Type type = TypeUtil.fromJavaType(obj); | ||
Transform<Object, Object> truncate = Transforms.truncate(type, num); | ||
Object value = truncate.apply(Literals.fromJavaType(obj).to(type).value()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do any more further type evaluation? What will happen if the type passed in is not supported?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because InputGroup.ANY
is used, any data type can be received. If a type that is not supported by a function is passed in, an unsupported exception will be reported in the corresponding transform, for example
SELECT truncates(4, TIMESTAMP '2022-05-20 10:12:55.038194');
...
java.lang.UnsupportedOperationException: Cannot truncate type: timestamp
...
public static class Bucket extends ScalarFunction { | ||
public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) { | ||
Type type = TypeUtil.fromJavaType(obj); | ||
Transform<Object, Integer> bucket = Transforms.bucket(type, num); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question for my own understanding:
Is it possible to make a typed version of the function? Is that common / desired generally speaking for Flink UDFs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I once wrote a version, but they look redundant, because only the input parameter types are different, and the function processing process is exactly the same...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dynamically calling bucket
based on the Java type is really dangerous. If the type is wrong, then there are going to be correctness issues. This should define a function per supported type.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
tEnv = StreamTableEnvironment.create(env); | ||
tEnv.createTemporarySystemFunction("buckets", PartitionTransformUdf.Bucket.class); | ||
tEnv.createTemporarySystemFunction("truncates", PartitionTransformUdf.Truncate.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: I see the functions are being registered in the catalog itself already.
Is this necessary whenever a user wants to use a function, to re-register it?
Ideally we have a single registration option so things are uniform in my opinion. The bucket
and truncate
functions are universally needed when working with Iceberg (though clearly only in some situations).
Also, for naming, is it possible that we might clash with some other name (if we registered the functions for people)?
I’m working on function registration in Spark so we should ideally get our names somewhat stabilized and similar.
I would propose iceberg_bucket
and iceberg_truncate
but need to better understand how it is used. Also would like to hear from users on this aspect. If the function is prefaced with a catalog name, maybe iceberg_
isn’t needed but from the examples that doesn’t seem to be the case. However, I’m not sure the tests are using the functions from the catalog or if this registration path is different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink supports 4 properties categories of functions:
- Temporary system functions
- System functions
- Temporary catalog functions
- Catalog functions
If user is using Ambiguous Function Reference, The resolution order is:
- Temporary system function
- System function
- Temporary catalog function, in the current catalog and current database of the session
- Catalog function, in the current catalog and current database of the session
For details https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/overview/
In the code, these two functions are registered in the catalog, so
- If the user uses iceberg's catalog, they can use these two functions directly.
- If the user does not use iceberg's catalog, they need to register these two functions before using them (eg written in test).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
eg:
// first, create iceberg catalog
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql("" +
"CREATE CATALOG hive_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive',\n" +
.....
" )");
// second, call function by Precise Function Reference
tEnv.executeSql("select hive_catalog.db.buckets(200, 12341)"); // you can use function directly by Precise Function Reference under hive_catalog
// third, or call function by Ambiguous Function Reference
tEnv.executeSql("use catalog hive_catalog");
tEnv.executeSql("select buckets(200, 12341)"); // you can use function directly by Ambiguous Function Reference under hive_catalog
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the ambiguous function reference here, for clarification, this (number 3 after the USE catalog call) will work?
Once this is merged, this might be something we want to document, or at least point users to the Flink documentation for function resolution when we mention these UDFs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will work. Because if use catalog hive_catalog
is not used, then the current catalog is the default_catalog
, and will can't find the buckets
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you separate the tests for registering these as UDFs from the tests for calling them through the catalog? I want to make sure we have separate tests for those two cases.
|
||
long num = 10; | ||
Integer obj = bucket.apply(num); | ||
String expectHumanString = bucket.toHumanString(obj); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we have a test where the expected human string is tested? Eg with a hard coded string value as the expected human string and then the actual value is the result of this call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, it will be clearer. I will modify it.
|
||
List<Row> sql = sql("SELECT truncates(4, x'010203040506')"); | ||
Assert.assertEquals(expectHumanString, sql.get(0).getField(0)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m still reviewing these tests, but for any types that are not supported, can we add a test checking what happens if those types are passed in?
Assuming an exception results, then AssertHelpers
has some methods that should be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I will add test cases for different types.
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
@@ -619,12 +630,17 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS | |||
|
|||
@Override | |||
public List<String> listFunctions(String dbName) throws CatalogException { | |||
return Collections.emptyList(); | |||
return Lists.newArrayList(partitionFunctions.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a good idea to make the functions accessible from any database. Instead, I think you should choose where you want to expose these functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This interface will show all the functions under the current iceberg catalog. My idea is that these functions are registered under the iceberg catalog level, and all databases under the current iceberg catalog can be used. If the user uses a non-iceberg catalog, these functions are not visible. I think the access boundary of these functions is the catalog level, so is it possible to not need to display or limit the database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure whether Flink allows you to omit the database name or not, but that's up to Flink. Iceberg should not expose these functions as though they exist in all databases, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you said is more reasonable. It is more appropriate for this interface to display the functions registered by the user.
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java
Outdated
Show resolved
Hide resolved
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionTransformUdf.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this, @wuwenchi! I think the approach is a little off, but we should be able to get this working soon.
} | ||
|
||
public String eval(int num, String value) { | ||
Transform<String, String> truncate = Transforms.truncate(Types.StringType.get(), num); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make these constants rather than looking them up each time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean like this?
public class PartitionTransformUdf {
public static Type stringType = Types.StringType.get();
public static class Truncate extends ScalarFunction {
public String eval(int num, String value) {
Transform<String, String> truncate = Transforms.truncate(stringType, num);
return truncate.apply(value);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant to use a shared instance of the Truncate transform rather than creating a new one each time. Can you add a static Cache to the class so that you can look up a transform by type and num?
Transform<ByteBuffer, ByteBuffer> truncate = Transforms.truncate(Types.BinaryType.get(), num); | ||
ByteBuffer byteBuffer = truncate.apply(wrap); | ||
byte[] out = new byte[byteBuffer.remaining()]; | ||
byteBuffer.get(out, 0, out.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should use ByteBuffers.toByteArray(buf)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public int eval(int num, @DataTypeHint("TIME") LocalTime value) { | ||
Types.TimeType type = Types.TimeType.get(); | ||
Transform<Object, Integer> bucket = Transforms.bucket(type, num); | ||
long micros = TimeUnit.NANOSECONDS.toMicros(value.toNanoOfDay()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use the conversion functions in DateTimeUtil
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -624,7 +635,12 @@ public List<String> listFunctions(String dbName) throws CatalogException { | |||
|
|||
@Override | |||
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { | |||
throw new FunctionNotExistException(getName(), functionPath); | |||
CatalogFunction catalogFunction = partitionFunctions.get(functionPath.getObjectName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that you've removed the listFunctions
method, but that doesn't actually solve the problem -- it introduces a new one. The problem is that these functions should not be defined in every database, but that's what this getFunction
implementation does because it ignores functionPath.getDatabaseName()
.
Whatever this method does, listFunctions
should be consistent with its behavior. If this will load a function, then listFunctions
should show it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My original idea was that these functions belong to the catalog (it may not appropriate now) and that the functions are registered when the catalog is created so that users can use them directly. So I provide the function in getFunctions
and show the function in listFunctions
. If the function is not displayed to the user now, and these functions cannot be get directly (also functions will not be directly registered in the catalog), should we mention these functions in the documentation, and provide an example for register and use function ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that these functions should be provided by the catalog and should be automatically registered or known. That isn't the problem. The problem is that this is effectively registering the functions in every database. Instead, choose one database where you want the functions to live. For stored procedures, we use system
. I think that's a good idea here.
public static final Cache<Tuple2<Integer, Type>, Transform<Object, Object>> TRUNCATE_CACHE = | ||
Caffeine.newBuilder().build(); | ||
|
||
public static Transform<Object, Object> getTruncateTransform(int num, Type type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need for this method to be public. Can you make it private instead? Same with the cache.
Also, Iceberg does not typically use get
in method names because it isn't helping. There is usually a better verb, or it can be omitted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
2. Remove the default registered function, consistent with the listFunction
flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
Outdated
Show resolved
Hide resolved
return new CatalogDatabaseImpl(Maps.newHashMap(), ""); | ||
} | ||
} else { | ||
try { | ||
Map<String, String> metadata = | ||
Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName))); | ||
String comment = metadata.remove("comment"); | ||
registerPartitionFunction(databaseName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuwenchi, these functions should not be registered in every known database. The problem before was not how you were tracking functions, it was the behavior. You need to change the behavior.
Here's my last comment:
The problem is that this is effectively registering the functions in every database. Instead, choose one database where you want the functions to live. For stored procedures, we use
system
. I think that's a good idea here.
Please remove the registration code that isn't needed and update the function lookup and listing to only return these functions if the database is system
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the system
database, is this database just a logical database? Is it that users cannot create tables under this library? In sql, is it not possible to execute use iceberg_catalog.system
? Because this is related to the list of functions, if we you cannot use use system
to switch the currently used database, we cannot list functions under the system
, because show functions
can only display the functions under the current database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see why system
would be different than any other database. It should be a normal database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is a normal database, should we create it first when open catalog like the default
database? Because if it is not created, the user cannot see the database, but can use the functions under the system
database, which feels a little strange.
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Add a partition function, you can easily view the partition where a value is located.
cc @kbendick thanks!