Skip to content

[FLINK-37913][table] Add built-in OBJECT_OF function #26704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,27 @@ valueconstruction:
```sql
f(columns => DESCRIPTOR(`col1`, `col2`), on_time => DESCRIPTOR(`ts`))
```
- sql: OBJECT_OF(className, [key, value [, key, value , ...]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep in mind that Chinese version also should be updated (could be in English for now)

table: objectOf(STRING, Object...)
description: |
Creates a structured object from a list of key-value pairs.

The function creates an instance of a structured type identified by the given class name.
The structured type is created by providing alternating key-value pairs where keys must be
string literals and values can be arbitrary expressions.

The class name is used for type identification during planning but the actual runtime
representation is a RowData. If the class cannot be resolved, Row.class is used as fallback.

Users are responsible for providing a valid fully qualified class name that exists
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User').
If an invalid or non-existent class name is provided, the function will fall back to using
Row.class as the type representation.

```sql
-- Creates a User object with name="Bob" and age=42
OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 42)
```
- table: NUMERIC.rows
description: Creates a NUMERIC interval of rows (commonly used in window creation).

Expand Down
2 changes: 2 additions & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Expressions
row
map_
map_from_arrays
object_of
row_interval
pi
e
Expand Down Expand Up @@ -260,6 +261,7 @@ advanced type helper functions
Expression.array_except
Expression.array_intersect
Expression.split
Expression.object_of

time definition functions
-------------------------
Expand Down
42 changes: 42 additions & 0 deletions flink-python/pyflink/table/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,48 @@ def map_from_arrays(key, value) -> Expression:
return _binary_op("mapFromArrays", key, value)


def object_of(class_name: Union[str, type], *args) -> Expression:
"""
Creates a structured object from a list of key-value pairs.

This function creates an instance of a structured type identified by the given class name.
The structured type is created by providing alternating key-value pairs where keys must be
string literals and values can be arbitrary expressions.

The class name is used for type identification during planning but the actual runtime
representation is a RowData. If the class cannot be resolved, Row.class is used as fallback.

This function corresponds to the SQL `OBJECT_OF` function.

Note: Users are responsible for providing a valid fully qualified class name that exists
in the classpath. The class name should follow Java naming conventions (e.g., 'com.example.User').
If an invalid or non-existent class name is provided, the function will fall back to using
Row.class as the type representation.

:param class_name: The fully qualified class name or class type representing the structured type
:param args: Alternating key-value pairs: key1, value1, key2, value2, ...
:return: A structured object expression

Examples:
::

>>> # Creates a User object with name="Alice" and age=30
>>> object_of("com.example.User", "name", "Alice", "age", 30)

>>> # Using a class type
>>> object_of(User, "name", "Bob", "age", 25)

.. seealso:: SQL function: OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 25)
"""
if isinstance(class_name, type):
# Convert Python class to fully qualified name
class_name_str = f"{class_name.__module__}.{class_name.__qualname__}"
else:
class_name_str = class_name

return _varargs_op("objectOf", class_name_str, *args)


def row_interval(rows: int) -> Expression:
"""
Creates an interval of rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,82 @@ public static ApiExpression mapFromArrays(Object key, Object value) {
objectToExpression(value));
}

/**
* Creates a structured object from a list of key-value pairs.
*
* <p>This function creates an instance of a structured type identified by the given class. The
* structured type is created by providing alternating key-value pairs where keys must be string
* literals and values can be arbitrary expressions.
*
* <p>The class name is used for type identification during planning but the actual runtime
* representation is a {@code RowData}. If the class cannot be resolved, {@code Row.class} is
* used as fallback.
*
* <p><b>Note: Users are responsible for providing a valid class that exists in the classpath.
* The class should be properly accessible and follow Java naming conventions. If an invalid or
* non-existent class is provided, the function will fall back to using {@code Row.class} as the
* type representation.</b>
*
* <p>Examples:
*
* <pre>{@code
* // Creates a User object with name="Alice" and age=30
* objectOf(User.class, "name", "Alice", "age", 30)
*
* }</pre>
*
* <p>This function corresponds to the SQL {@code OBJECT_OF} function.
*
* @param clazz The class representing the structured type
* @param fields Alternating key-value pairs: key1, value1, key2, value2, ...
* @return A structured object expression
* @see #objectOf(String, Object...)
*/
public static ApiExpression objectOf(Class<?> clazz, Object... fields) {
return apiCallAtLeastOneArgument(
BuiltInFunctionDefinitions.OBJECT_OF, valueLiteral(clazz.getName()), fields);
}

/**
* Creates a structured object from a list of key-value pairs.
*
* <p>This function creates an instance of a structured type identified by the given class name.
* The structured type is created by providing alternating key-value pairs where keys must be
* string literals and values can be arbitrary expressions.
*
* <p>The class name is used for type identification during planning but the actual runtime
* representation is a {@code RowData}. If the class cannot be resolved, {@code Row.class} is
* used as fallback.
*
* <p><b>Note: Users are responsible for providing a valid fully qualified class name that
* exists in the classpath. The class name should follow Java naming conventions (e.g.,
* 'com.example.User'). If an invalid or non-existent class name is provided, the function will
* fall back to using {@code Row.class} as the type representation.</b>
*
* <p>Examples:
*
* <pre>{@code
* // Creates a User object with name="Bob" and age=25
* objectOf("com.example.User", "name", "Bob", "age", 25)
*
* }</pre>
*
* <p>This function corresponds to the SQL {@code OBJECT_OF} function:
*
* <pre>{@code
* OBJECT_OF('com.example.User', 'name', 'Bob', 'age', 25)
* }</pre>
*
* @param className The fully qualified class name representing the structured type
* @param fields Alternating key-value pairs: key1, value1, key2, value2, ...
* @return A structured object expression
* @see #objectOf(Class, Object...)
*/
public static ApiExpression objectOf(String className, Object... fields) {
return apiCallAtLeastOneArgument(
BuiltInFunctionDefinitions.OBJECT_OF, valueLiteral(className), fields);
}

/**
* Creates an interval of rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ public static <T> DataType STRUCTURED(Class<T> implementationClass, Field... fie
* @see DataTypes#of(Class)
* @see StructuredType
*/
public static <T> DataType STRUCTURED(String className, Field... fields) {
public static DataType STRUCTURED(String className, Field... fields) {
return buildStructuredType(StructuredType.newBuilder(className), fields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.EltFunction")
.build();

public static final BuiltInFunctionDefinition OBJECT_OF =
BuiltInFunctionDefinition.newBuilder()
.name("OBJECT_OF")
.kind(SCALAR)
.inputTypeStrategy(SpecificInputTypeStrategies.OBJECT_OF)
.outputTypeStrategy(SpecificTypeStrategies.OBJECT_OF)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.ObjectOfFunction")
.build();

// --------------------------------------------------------------------------------------------
// Math functions
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

/**
* Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts.
*
* <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring:
*
* <ul>
* <li>The argument count is odd (className + pairs of key-value arguments)
* <li>The first argument is a STRING/VARCHAR representing the class name
* <li>All key arguments (odd positions after the first) are STRING/VARCHAR types
* <li>Field names are unique across all key-value pairs
* <li>Value arguments (even positions after the first) can be any type
* </ul>
*
* <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2,
* ...)}
*
* <p>Validation rules:
*
* <ul>
* <li>Minimum 1 argument (just the class name)
* <li>Odd total number of arguments (className + key-value pairs)
* <li>Keys must be string literals for field name extraction
* <li>No duplicate field names allowed
* </ul>
*
* <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in
* the classpath. The class name should follow Java naming conventions. While this strategy
* validates the format and type of the class name argument, it does not verify the class existence
* in the classpath. If an invalid or non-existent class name is provided, the function will fall
* back to using Row.class as the type representation.</b>
*
* <p>Example valid calls:
*
* <ul>
* <li>{@code OBJECT_OF('com.example.User')} - empty object
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice')} - single field
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} - multiple fields
* </ul>
*
* @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF
* @see ObjectOfTypeStrategy
*/
public class ObjectOfInputTypeStrategy implements InputTypeStrategy {

private static final ArgumentCount AT_LEAST_ONE_ODD =
new ArgumentCount() {
@Override
public boolean isValidCount(int count) {
return count % 2 == 1;
}

@Override
public Optional<Integer> getMinCount() {
return Optional.of(1);
}

@Override
public Optional<Integer> getMaxCount() {
return Optional.empty();
}
};

private static void validateClassInput(
final CallContext callContext, final List<DataType> argumentDataTypes) {
final LogicalType classArgumentType = argumentDataTypes.get(0).getLogicalType();

final String errorMessage =
"The first argument must be a STRING/VARCHAR type representing the class name.";
if (!classArgumentType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new ValidationException(errorMessage);
}

final Optional<String> className = callContext.getArgumentValue(0, String.class);
className.orElseThrow(() -> new ValidationException(errorMessage));
}

private static void validateKeyArguments(
final CallContext callContext, final List<DataType> argumentDataTypes) {
final Set<String> fieldNames = new HashSet<>();
for (int i = 1; i < argumentDataTypes.size(); i += 2) {
final LogicalType fieldNameLogicalType = argumentDataTypes.get(i).getLogicalType();
validateFieldNameInput(callContext, i, fieldNameLogicalType, fieldNames);
}
}

private static void validateFieldNameInput(
final CallContext callContext,
final int idx,
final LogicalType logicalType,
final Set<String> fieldNames) {
final int keyIndex = idx + 1;
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) {
throw new ValidationException(
"The field key at position "
+ keyIndex
+ " must be a STRING/VARCHAR type, but was "
+ logicalType.asSummaryString()
+ ".");
}
Comment on lines +128 to +135
Copy link
Contributor

@snuyanzin snuyanzin Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check if it is not null (or not nullable)?
Otherwise
output for nullable classname is confusing like

SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice');

gives

Caused by: org.apache.flink.table.api.ValidationException: Invalid function call:
OBJECT_OF(STRING, CHAR(2) NOT NULL, CHAR(1) NOT NULL, CHAR(1) NOT NULL, CHAR(5) NOT NULL)
...
Caused by: org.apache.flink.table.api.ValidationException: Could not infer an output type for the given arguments.

which in fact satisfies the required

Supported signatures are:
OBJECT_OF(STRING, [STRING, ANY]*...)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I have addressed the nullability check and added tests.

Copy link
Contributor

@snuyanzin snuyanzin Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how will it behave for the case when arg is nullable however we can not resolve during planning like ?

  1. there is a table input with data
class_name1
class_name2
NULL
class_name3
  1. and there is a query
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input

final String fieldName =
callContext
.getArgumentValue(idx, String.class)
.orElseThrow(
() ->
new ValidationException(
"Field name at position "
+ keyIndex
+ " must be a non null STRING/VARCHAR type."));

if (!fieldNames.add(fieldName)) {
throw new ValidationException(
String.format(
"The field name '%s' at position %d is repeated.",
fieldName, keyIndex));
}
}

@Override
public ArgumentCount getArgumentCount() {
return AT_LEAST_ONE_ODD;
}

@Override
public Optional<List<DataType>> inferInputTypes(
final CallContext callContext, final boolean throwOnFailure) {
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();

validateClassInput(callContext, argumentDataTypes);
validateKeyArguments(callContext, argumentDataTypes);

return Optional.of(argumentDataTypes);
}

@Override
public List<Signature> getExpectedSignatures(final FunctionDefinition definition) {
// OBJECT_OF expects: name, key1, value1, key2, value2, ...
// OBJECT_OF(<name>, [<key>, <value> [, <key>, <value> , ...]] )
final List<Signature.Argument> arguments = new ArrayList<>();

// Class name (required)
arguments.add(Signature.Argument.of("STRING"));

// Key-value pairs (optional, repeating)
arguments.add(Signature.Argument.ofVarying("[STRING, ANY]*"));

return List.of(Signature.of(arguments));
}
}
Loading