Skip to content

[FLINK-37913][table] Add built-in OBJECT_OF function #26704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

raminqaf
Copy link

What is the purpose of the change

This pull request implements the OBJECT_OF function as specified in FLIP-520 to enable creation of structured types in SQL and Table API. The function allows users to construct structured objects from key-value pairs without requiring UDFs, making structured type creation SQL-serializable and improving the overall usability of structured types in Flink.

Brief change log

  • Added OBJECT_OF function definition to BuiltInFunctionDefinitions
  • Implemented ObjectOfInputTypeStrategy for input validation (odd argument count, string keys, unique field names)
  • Implemented ObjectOfTypeStrategy for output type inference (creates structured types)
  • Added ObjectOfFunction runtime implementation that creates RowData from key-value pairs
  • Enhanced Expressions.java with objectOf() methods for Table API support
  • Added Python API support with object_of() function in expressions.py
  • Updated SQL functions documentation with OBJECT_OF function details

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for ObjectOfInputTypeStrategy to validate argument count, type validation, and field name uniqueness
  • Added integration tests in StructuredFunctionsITCase to test end-to-end functionality
  • Verified SQL usage: SELECT OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)
  • Verified Table API usage: objectOf(User.class, "name", "Bob", "age", 42)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (additions to Expressions.java and BuiltInFunctionDefinitions)
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs (updated SQL functions documentation, comprehensive JavaDocs for all new classes and methods, Python API documentation)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 19, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Comment on lines +116 to +122
if (logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
return DataTypes.STRING().nullable();
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@twalthr I was not 100% that this is the correct approach! I noticed in the output of the tests that strings were used as CHAR(n), so I had to convert them to STRING(). I couldn't find any util class that does this conversion, so I added the logic here.

@gustavodemorais
Copy link
Contributor

I think there's a checkstyle violation making the CI fail. Make sure to run ./mvnw checkstyle:check locally

Comment on lines +126 to +132
"NestedConstructor(Type1Constructor(f0, f1), Type2Constructor(15, 'Alice')) = "
+ "OBJECT_OF('"
+ NestedType.class.getName()
+ "', 'n1', OBJECT_OF('"
+ Type1.class.getName()
+ "', 'a', 42, 'b', 'Bob'), "
+ "'n2', OBJECT_OF('"
+ Type2.class.getName()
+ "', 'a', 15, 'b', 'Alice'))",
Copy link
Contributor

@snuyanzin snuyanzin Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if it was missed during PTF implementation
will it work with record s?
Asking since we have a dedicated folder for java17 tests
for instance https://github.com/apache/flink/blob/1c34ca011cacdbb3b0f48b485eac89dd913d29bf/flink-tests-java17/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoRecordSerializerUpgradeTestSpecifications.java

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. So far we don't have any tests for records because the language level was too low.

@@ -1178,6 +1178,22 @@ valueconstruction:
```sql
f(columns => DESCRIPTOR(`col1`, `col2`), on_time => DESCRIPTOR(`ts`))
```
- sql: OBJECT_OF(className, [key, value [, key, value , ...]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep in mind that Chinese version also should be updated (could be in English for now)

Comment on lines +99 to +126
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new ValidationException(
"The field key at position "
+ keyIndex
+ " must be a STRING/VARCHAR type, but was "
+ logicalType.asSummaryString()
+ ".");
}
Copy link
Contributor

@snuyanzin snuyanzin Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if it is not null (or not nullable)?
Otherwise
output for nullable classname is confusing like

SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice');

gives

Caused by: org.apache.flink.table.api.ValidationException: Invalid function call:
OBJECT_OF(STRING, CHAR(2) NOT NULL, CHAR(1) NOT NULL, CHAR(1) NOT NULL, CHAR(5) NOT NULL)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.

which in fact satisfies the required

Supported signatures are:
OBJECT_OF(STRING, [STRING, ANY]*...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I have addressed the nullability check and added tests.

Copy link
Contributor

@snuyanzin snuyanzin Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will it behave for the case when arg is nullable however we can not resolve during planning like ?

  1. there is a table input with data
class_name1
class_name2
NULL
class_name3
  1. and there is a query
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input

// Test with OBJECT_OF
.testResult(
objectOf(Type1.class, "a", 42, "b", "Bob"),
"OBJECT_OF('" + Type1.class.getName() + "', 'a', 42, 'b', 'Bob')",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems subquery not supported (not sure if it is related to this PR)
this is ok

SELECT OBJECT_OF('<class_name>', 'f0', 1);

this was not able to success

SELECT OBJECT_OF('<class_name>', 'f0', (SELECT 1));

or did I miss anything?

final String fieldName =
callContext
.getArgumentValue(keyIdx, String.class)
.orElseThrow(IllegalStateException::new);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better to call CallContext#fail

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the fail method implementation, and it throws a ValidationError. At this point in the code, we have already validated all the inputs in the ObjectOfInputTypeStrategy. Therefore, this case should (ideally) never occur, and if so, it should be an IllegalState.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, this case should (ideally) never occur, and if so, it should be an IllegalState.

i disagree
the simplest way to make it happen

SELECT OBJECT_OF('class_name', CAST(NULL AS STRING), 'as');

And then user will see just IllegalStateException without any idea what to do about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tend to think that classname and all keys should be always non nullable

Comment on lines +70 to +74
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} → {@code
* STRUCTURED<com.example.User>(name STRING, age INT)}
* <li>{@code OBJECT_OF('com.example.Point', 'x', 1.5, 'y', 2.0)} → {@code
* STRUCTURED<com.example.Point>(x DOUBLE, y DOUBLE)}
* </ul>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we have validation checking that classname should satisfy jvm rules for class naming
should we have another validation for keys in order to check that field names satisfy rules for jvm field naming?

I'm asking since it seems it is possible to have

OBJECT_OF('com.example.User', '0', 'Alice', '1', 30)

and we can not have fields called 0, and 1 in java ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same for the reserved keywords which might be used for field names

Copy link
Author

@raminqaf raminqaf Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point... We can also validate if the field name has a space, a dot ., or a hyphen -. OBJECT_OF('com.example.User', 'field 1', 'Alice', 'field.2', 30, 'field-3', 60).

If we want to do the validation very specifically, we should also check against reserved JVM keywords (if, void, etc.).

This makes pyFlink validation complicated due to other keywords.

@twalthr what is your opinion on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this kind of validation is needed, I will extract the fieldName validation so we can use it in OBJECT_UPDATE(object, <key>, <value>) too!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SourceVersion#isName ?
like at

private static @Nullable String checkClassName(@Nullable String className) {
if (className != null && !SourceVersion.isName(className)) {
throw new ValidationException(
String.format(
"Invalid class name '%s'. The class name must comply with JVM identifier rules.",
className));
}
return className;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants