-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37913][table] Add built-in OBJECT_OF function #26704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
if (logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { | ||
return DataTypes.STRING().nullable(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twalthr I was not 100% that this is the correct approach! I noticed in the output of the tests that strings were used as CHAR(n)
, so I had to convert them to STRING()
. I couldn't find any util class that does this conversion, so I added the logic here.
I think there's a checkstyle violation making the CI fail. Make sure to run |
"NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = " | ||
+ "OBJECT_OF('" | ||
+ NestedType.class.getName() | ||
+ "', 'n1', OBJECT_OF('" | ||
+ Type1.class.getName() | ||
+ "', 'a', 42, 'b', 'Bob'), " | ||
+ "'n2', OBJECT_OF('" | ||
+ Type2.class.getName() | ||
+ "', 'a', 15, 'b', 'Alice'))", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if it was missed during PTF implementation
will it work with record
s?
Asking since we have a dedicated folder for java17 tests
for instance https://github.com/apache/flink/blob/1c34ca011cacdbb3b0f48b485eac89dd913d29bf/flink-tests-java17/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoRecordSerializerUpgradeTestSpecifications.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. So far we don't have any tests for records because the language level was too low.
@@ -1178,6 +1178,22 @@ valueconstruction: | |||
```sql | |||
f(columns => DESCRIPTOR(`col1`, `col2`), on_time => DESCRIPTOR(`ts`)) | |||
``` | |||
- sql: OBJECT_OF(className, [key, value [, key, value , ...]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep in mind that Chinese version also should be updated (could be in English for now)
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Show resolved
Hide resolved
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { | ||
throw new ValidationException( | ||
"The field key at position " | ||
+ keyIndex | ||
+ " must be a STRING/VARCHAR type, but was " | ||
+ logicalType.asSummaryString() | ||
+ "."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check if it is not null (or not nullable)?
Otherwise
output for nullable classname is confusing like
SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice');
gives
Caused by: org.apache.flink.table.api.ValidationException: Invalid function call:
OBJECT_OF(STRING, CHAR(2) NOT NULL, CHAR(1) NOT NULL, CHAR(1) NOT NULL, CHAR(5) NOT NULL)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.
which in fact satisfies the required
Supported signatures are:
OBJECT_OF(STRING, [STRING, ANY]*...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback! I have addressed the nullability check and added tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how will it behave for the case when arg is nullable however we can not resolve during planning like ?
- there is a table
input
with data
class_name1
class_name2
NULL
class_name3
- and there is a query
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input
// Test with OBJECT_OF | ||
.testResult( | ||
objectOf(Type1.class, "a", 42, "b", "Bob"), | ||
"OBJECT_OF('" + Type1.class.getName() + "', 'a', 42, 'b', 'Bob')", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems subquery not supported (not sure if it is related to this PR)
this is ok
SELECT OBJECT_OF('<class_name>', 'f0', 1);
this was not able to success
SELECT OBJECT_OF('<class_name>', 'f0', (SELECT 1));
or did I miss anything?
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Show resolved
Hide resolved
final String fieldName = | ||
callContext | ||
.getArgumentValue(keyIdx, String.class) | ||
.orElseThrow(IllegalStateException::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably better to call CallContext#fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked at the fail
method implementation, and it throws a ValidationError. At this point in the code, we have already validated all the inputs in the ObjectOfInputTypeStrategy
. Therefore, this case should (ideally) never occur, and if so, it should be an IllegalState.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, this case should (ideally) never occur, and if so, it should be an IllegalState.
i disagree
the simplest way to make it happen
SELECT OBJECT_OF('class_name', CAST(NULL AS STRING), 'as');
And then user will see just IllegalStateException
without any idea what to do about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i tend to think that classname and all keys should be always non nullable
...-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ObjectOfFunction.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/flink/table/types/inference/strategies/ObjectOfInputTypeStrategy.java
Outdated
Show resolved
Hide resolved
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} → {@code | ||
* STRUCTURED<com.example.User>(name STRING, age INT)} | ||
* <li>{@code OBJECT_OF('com.example.Point', 'x', 1.5, 'y', 2.0)} → {@code | ||
* STRUCTURED<com.example.Point>(x DOUBLE, y DOUBLE)} | ||
* </ul> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have validation checking that classname should satisfy jvm rules for class naming
should we have another validation for keys in order to check that field names satisfy rules for jvm field naming?
I'm asking since it seems it is possible to have
OBJECT_OF('com.example.User', '0', 'Alice', '1', 30)
and we can not have fields called 0
, and 1
in java ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same for the reserved keywords which might be used for field names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point... We can also validate if the field name has a space, a dot .
, or a hyphen -
. OBJECT_OF('com.example.User', 'field 1', 'Alice', 'field.2', 30, 'field-3', 60)
.
If we want to do the validation very specifically, we should also check against reserved JVM keywords (if
, void
, etc.).
This makes pyFlink validation complicated due to other keywords.
@twalthr what is your opinion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this kind of validation is needed, I will extract the fieldName validation so we can use it in OBJECT_UPDATE(object, <key>, <value>)
too!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SourceVersion#isName ?
like at
Lines 560 to 568 in f612013
private static @Nullable String checkClassName(@Nullable String className) { | |
if (className != null && !SourceVersion.isName(className)) { | |
throw new ValidationException( | |
String.format( | |
"Invalid class name '%s'. The class name must comply with JVM identifier rules.", | |
className)); | |
} | |
return className; | |
} |
What is the purpose of the change
This pull request implements the
OBJECT_OF
function as specified in FLIP-520 to enable creation of structured types in SQL and Table API. The function allows users to construct structured objects from key-value pairs without requiring UDFs, making structured type creation SQL-serializable and improving the overall usability of structured types in Flink.Brief change log
OBJECT_OF
function definition toBuiltInFunctionDefinitions
ObjectOfInputTypeStrategy
for input validation (odd argument count, string keys, unique field names)ObjectOfTypeStrategy
for output type inference (creates structured types)ObjectOfFunction
runtime implementation that createsRowData
from key-value pairsExpressions.java
withobjectOf()
methods for Table API supportobject_of()
function inexpressions.py
OBJECT_OF
function detailsVerifying this change
This change added tests and can be verified as follows:
ObjectOfInputTypeStrategy
to validate argument count, type validation, and field name uniquenessStructuredFunctionsITCase
to test end-to-end functionalitySELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)
objectOf(User.class, "name", "Bob", "age", 42)
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: yes (additions toExpressions.java
andBuiltInFunctionDefinitions
)Documentation