Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: add sql partition functions #5120

Closed
wants to merge 15 commits into from

Conversation

wuwenchi
Copy link
Contributor

Add a partition function, you can easily view the partition where a value is located.
cc @kbendick thanks!

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wuwenchi!

I left some initial feedback based on my first pass through. Im still digesting this but had some questions and some feedback. 👍

}

@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
throw new FunctionNotExistException(getName(), functionPath);
CatalogFunction catalogFunction = partitionFunctions.get(functionPath.getObjectName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any situations where this might get used for other reasons than just partition UDFs? Like accessing some other UDF that the user might have registered?

From the existing code, it looks fine. But for my own understanding, I’m still wondering if it’s possible for a user to register their own UDF that would be associated with this Catalog, and if so are we currently making it not possible to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any situations where this might get used for other reasons than just partition UDFs? Like accessing some other UDF that the user might have registered?

Here, we mainly use these udfs to create, modify, and delete partitionSpec through flink.

From the existing code, it looks fine. But for my own understanding, I’m still wondering if it’s possible for a user to register their own UDF that would be associated with this Catalog, and if so are we currently making it not possible to do that?

Currently, it is not supported for users to register UDF under the flink catalog. But in the next step (just next PR), I will add this feature.

But after we support users to add UDFs, I still want to add these two functions, so that users do not need to manually add them again when they want to query related partitions, I would like to provide them by default. What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But after we support users to add UDFs, I still want to add these two functions, so that users do not need to manually add them again when they want to query related partitions, I would like to provide them by default. What do you think about this?

I agree we should register them on behalf of the users. If users will also be able to register their own functions on the catalog, we might want to pick slightly more specific names.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have found the answers for my deleted questions 😄

public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
Type type = TypeUtil.fromJavaType(obj);
Transform<Object, Object> truncate = Transforms.truncate(type, num);
Object value = truncate.apply(Literals.fromJavaType(obj).to(type).value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do any more further type evaluation? What will happen if the type passed in is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because InputGroup.ANY is used, any data type can be received. If a type that is not supported by a function is passed in, an unsupported exception will be reported in the corresponding transform, for example

SELECT truncates(4, TIMESTAMP '2022-05-20 10:12:55.038194');
...
  java.lang.UnsupportedOperationException: Cannot truncate type: timestamp
...

public static class Bucket extends ScalarFunction {
public String eval(int num, @DataTypeHint(inputGroup = InputGroup.ANY) Object obj) {
Type type = TypeUtil.fromJavaType(obj);
Transform<Object, Integer> bucket = Transforms.bucket(type, num);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question for my own understanding:

Is it possible to make a typed version of the function? Is that common / desired generally speaking for Flink UDFs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I once wrote a version, but they look redundant, because only the input parameter types are different, and the function processing process is exactly the same...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dynamically calling bucket based on the Java type is really dangerous. If the type is wrong, then there are going to be correctness issues. This should define a function per supported type.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
tEnv = StreamTableEnvironment.create(env);
tEnv.createTemporarySystemFunction("buckets", PartitionTransformUdf.Bucket.class);
tEnv.createTemporarySystemFunction("truncates", PartitionTransformUdf.Truncate.class);
Copy link
Contributor

@kbendick kbendick Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: I see the functions are being registered in the catalog itself already.

Is this necessary whenever a user wants to use a function, to re-register it?

Ideally we have a single registration option so things are uniform in my opinion. The bucket and truncate functions are universally needed when working with Iceberg (though clearly only in some situations).

Also, for naming, is it possible that we might clash with some other name (if we registered the functions for people)?

I’m working on function registration in Spark so we should ideally get our names somewhat stabilized and similar.

I would propose iceberg_bucket and iceberg_truncate but need to better understand how it is used. Also would like to hear from users on this aspect. If the function is prefaced with a catalog name, maybe iceberg_ isn’t needed but from the examples that doesn’t seem to be the case. However, I’m not sure the tests are using the functions from the catalog or if this registration path is different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink supports 4 properties categories of functions:

  1. Temporary system functions
  2. System functions
  3. Temporary catalog functions
  4. Catalog functions

If user is using Ambiguous Function Reference, The resolution order is:

  1. Temporary system function
  2. System function
  3. Temporary catalog function, in the current catalog and current database of the session
  4. Catalog function, in the current catalog and current database of the session

For details https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/overview/

In the code, these two functions are registered in the catalog, so

  1. If the user uses iceberg's catalog, they can use these two functions directly.
  2. If the user does not use iceberg's catalog, they need to register these two functions before using them (eg written in test).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eg:

  // first, create iceberg catalog
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  tEnv = StreamTableEnvironment.create(env);
  tEnv.executeSql("" +
          "CREATE CATALOG hive_catalog WITH (\n" +
          "    'type'='iceberg',\n" +
          "    'catalog-type'='hive',\n" +
          .....
          "  )");
    
    // second, call function by Precise Function Reference
    tEnv.executeSql("select hive_catalog.db.buckets(200, 12341)");        // you can use function directly by Precise Function Reference under hive_catalog

    // third, or call function by Ambiguous Function Reference
    tEnv.executeSql("use catalog hive_catalog");
    tEnv.executeSql("select buckets(200, 12341)");        // you can use function directly by Ambiguous Function Reference under hive_catalog

Copy link
Contributor

@kbendick kbendick Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the ambiguous function reference here, for clarification, this (number 3 after the USE catalog call) will work?

Once this is merged, this might be something we want to document, or at least point users to the Flink documentation for function resolution when we mention these UDFs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will work. Because if use catalog hive_catalog is not used, then the current catalog is the default_catalog, and will can't find the buckets function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you separate the tests for registering these as UDFs from the tests for calling them through the catalog? I want to make sure we have separate tests for those two cases.


long num = 10;
Integer obj = bucket.apply(num);
String expectHumanString = bucket.toHumanString(obj);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we have a test where the expected human string is tested? Eg with a hard coded string value as the expected human string and then the actual value is the result of this call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, it will be clearer. I will modify it.


List<Row> sql = sql("SELECT truncates(4, x'010203040506')");
Assert.assertEquals(expectHumanString, sql.get(0).getField(0));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m still reviewing these tests, but for any types that are not supported, can we add a test checking what happens if those types are passed in?

Assuming an exception results, then AssertHelpers has some methods that should be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I will add test cases for different types.

@@ -619,12 +630,17 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS

@Override
public List<String> listFunctions(String dbName) throws CatalogException {
return Collections.emptyList();
return Lists.newArrayList(partitionFunctions.keySet());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a good idea to make the functions accessible from any database. Instead, I think you should choose where you want to expose these functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface will show all the functions under the current iceberg catalog. My idea is that these functions are registered under the iceberg catalog level, and all databases under the current iceberg catalog can be used. If the user uses a non-iceberg catalog, these functions are not visible. I think the access boundary of these functions is the catalog level, so is it possible to not need to display or limit the database?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether Flink allows you to omit the database name or not, but that's up to Flink. Iceberg should not expose these functions as though they exist in all databases, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you said is more reasonable. It is more appropriate for this interface to display the functions registered by the user.

Copy link
Contributor

@rdblue rdblue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, @wuwenchi! I think the approach is a little off, but we should be able to get this working soon.

@github-actions github-actions bot removed the API label Jul 5, 2022
}

public String eval(int num, String value) {
Transform<String, String> truncate = Transforms.truncate(Types.StringType.get(), num);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make these constants rather than looking them up each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like this?

public class PartitionTransformUdf {

 public static Type stringType = Types.StringType.get();

  public static class Truncate extends ScalarFunction {
    public String eval(int num, String value) {
      Transform<String, String> truncate = Transforms.truncate(stringType, num);
      return truncate.apply(value);
    }
 }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant to use a shared instance of the Truncate transform rather than creating a new one each time. Can you add a static Cache to the class so that you can look up a transform by type and num?

Transform<ByteBuffer, ByteBuffer> truncate = Transforms.truncate(Types.BinaryType.get(), num);
ByteBuffer byteBuffer = truncate.apply(wrap);
byte[] out = new byte[byteBuffer.remaining()];
byteBuffer.get(out, 0, out.length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use ByteBuffers.toByteArray(buf).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

public int eval(int num, @DataTypeHint("TIME") LocalTime value) {
Types.TimeType type = Types.TimeType.get();
Transform<Object, Integer> bucket = Transforms.bucket(type, num);
long micros = TimeUnit.NANOSECONDS.toMicros(value.toNanoOfDay());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the conversion functions in DateTimeUtil?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@github-actions github-actions bot added the build label Jul 9, 2022
@@ -624,7 +635,12 @@ public List<String> listFunctions(String dbName) throws CatalogException {

@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
throw new FunctionNotExistException(getName(), functionPath);
CatalogFunction catalogFunction = partitionFunctions.get(functionPath.getObjectName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that you've removed the listFunctions method, but that doesn't actually solve the problem -- it introduces a new one. The problem is that these functions should not be defined in every database, but that's what this getFunction implementation does because it ignores functionPath.getDatabaseName().

Whatever this method does, listFunctions should be consistent with its behavior. If this will load a function, then listFunctions should show it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My original idea was that these functions belong to the catalog (it may not appropriate now) and that the functions are registered when the catalog is created so that users can use them directly. So I provide the function in getFunctions and show the function in listFunctions. If the function is not displayed to the user now, and these functions cannot be get directly (also functions will not be directly registered in the catalog), should we mention these functions in the documentation, and provide an example for register and use function ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that these functions should be provided by the catalog and should be automatically registered or known. That isn't the problem. The problem is that this is effectively registering the functions in every database. Instead, choose one database where you want the functions to live. For stored procedures, we use system. I think that's a good idea here.

public static final Cache<Tuple2<Integer, Type>, Transform<Object, Object>> TRUNCATE_CACHE =
Caffeine.newBuilder().build();

public static Transform<Object, Object> getTruncateTransform(int num, Type type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for this method to be public. Can you make it private instead? Same with the cache.

Also, Iceberg does not typically use get in method names because it isn't helping. There is usually a better verb, or it can be omitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

吴文池 added 2 commits July 14, 2022 10:12
2. Remove the default registered function, consistent with the listFunction
return new CatalogDatabaseImpl(Maps.newHashMap(), "");
}
} else {
try {
Map<String, String> metadata =
Maps.newHashMap(asNamespaceCatalog.loadNamespaceMetadata(toNamespace(databaseName)));
String comment = metadata.remove("comment");
registerPartitionFunction(databaseName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuwenchi, these functions should not be registered in every known database. The problem before was not how you were tracking functions, it was the behavior. You need to change the behavior.

Here's my last comment:

The problem is that this is effectively registering the functions in every database. Instead, choose one database where you want the functions to live. For stored procedures, we use system. I think that's a good idea here.

Please remove the registration code that isn't needed and update the function lookup and listing to only return these functions if the database is system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the system database, is this database just a logical database? Is it that users cannot create tables under this library? In sql, is it not possible to execute use iceberg_catalog.system? Because this is related to the list of functions, if we you cannot use use system to switch the currently used database, we cannot list functions under the system, because show functions can only display the functions under the current database.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see why system would be different than any other database. It should be a normal database.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a normal database, should we create it first when open catalog like the default database? Because if it is not created, the user cannot see the database, but can use the functions under the system database, which feels a little strange.

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 16, 2024
Copy link

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Aug 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants