-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-37913][table] Add built-in OBJECT_OF function #26704
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
920f2e6
4a74618
73555ea
29a9380
78e4bdb
24e11e2
f629d8e
42fd488
03682df
d7424b8
40c0cd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.types.inference.strategies; | ||
|
||
import org.apache.flink.table.api.ValidationException; | ||
import org.apache.flink.table.functions.FunctionDefinition; | ||
import org.apache.flink.table.types.DataType; | ||
import org.apache.flink.table.types.inference.ArgumentCount; | ||
import org.apache.flink.table.types.inference.CallContext; | ||
import org.apache.flink.table.types.inference.InputTypeStrategy; | ||
import org.apache.flink.table.types.inference.Signature; | ||
import org.apache.flink.table.types.logical.LogicalType; | ||
import org.apache.flink.table.types.logical.LogicalTypeFamily; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
|
||
/** | ||
* Input type strategy for the {@code OBJECT_OF} function that validates argument types and counts. | ||
* | ||
* <p>This strategy validates the input arguments for the {@code OBJECT_OF} function, ensuring: | ||
* | ||
* <ul> | ||
* <li>The argument count is odd (className + pairs of key-value arguments) | ||
* <li>The first argument is a STRING/VARCHAR representing the class name | ||
* <li>All key arguments (odd positions after the first) are STRING/VARCHAR types | ||
* <li>Field names are unique across all key-value pairs | ||
* <li>Value arguments (even positions after the first) can be any type | ||
* </ul> | ||
* | ||
* <p>The expected function signature is: {@code OBJECT_OF(className, key1, value1, key2, value2, | ||
* ...)} | ||
* | ||
* <p>Validation rules: | ||
* | ||
* <ul> | ||
* <li>Minimum 1 argument (just the class name) | ||
* <li>Odd total number of arguments (className + key-value pairs) | ||
* <li>Keys must be string literals for field name extraction | ||
* <li>No duplicate field names allowed | ||
snuyanzin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* </ul> | ||
* | ||
* <p><b>Note: Users are responsible for providing a valid fully qualified class name that exists in | ||
* the classpath. The class name should follow Java naming conventions. While this strategy | ||
* validates the format and type of the class name argument, it does not verify the class existence | ||
* in the classpath. If an invalid or non-existent class name is provided, the function will fall | ||
* back to using Row.class as the type representation.</b> | ||
* | ||
* <p>Example valid calls: | ||
* | ||
* <ul> | ||
* <li>{@code OBJECT_OF('com.example.User')} - empty object | ||
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice')} - single field | ||
* <li>{@code OBJECT_OF('com.example.User', 'name', 'Alice', 'age', 30)} - multiple fields | ||
* </ul> | ||
* | ||
* @see org.apache.flink.table.functions.BuiltInFunctionDefinitions#OBJECT_OF | ||
* @see ObjectOfTypeStrategy | ||
*/ | ||
public class ObjectOfInputTypeStrategy implements InputTypeStrategy { | ||
|
||
private static final ArgumentCount AT_LEAST_ONE_ODD = | ||
new ArgumentCount() { | ||
@Override | ||
public boolean isValidCount(int count) { | ||
return count % 2 == 1; | ||
} | ||
|
||
@Override | ||
public Optional<Integer> getMinCount() { | ||
return Optional.of(1); | ||
} | ||
|
||
@Override | ||
public Optional<Integer> getMaxCount() { | ||
return Optional.empty(); | ||
} | ||
}; | ||
|
||
private static void validateClassInput( | ||
final CallContext callContext, final List<DataType> argumentDataTypes) { | ||
final LogicalType classArgumentType = argumentDataTypes.get(0).getLogicalType(); | ||
|
||
final String errorMessage = | ||
"The first argument must be a STRING/VARCHAR type representing the class name."; | ||
if (!classArgumentType.is(LogicalTypeFamily.CHARACTER_STRING)) { | ||
throw new ValidationException(errorMessage); | ||
} | ||
|
||
final Optional<String> className = callContext.getArgumentValue(0, String.class); | ||
className.orElseThrow(() -> new ValidationException(errorMessage)); | ||
} | ||
|
||
private static void validateKeyArguments( | ||
final CallContext callContext, final List<DataType> argumentDataTypes) { | ||
final Set<String> fieldNames = new HashSet<>(); | ||
for (int i = 1; i < argumentDataTypes.size(); i += 2) { | ||
final LogicalType fieldNameLogicalType = argumentDataTypes.get(i).getLogicalType(); | ||
validateFieldNameInput(callContext, i, fieldNameLogicalType, fieldNames); | ||
} | ||
} | ||
|
||
private static void validateFieldNameInput( | ||
final CallContext callContext, | ||
final int idx, | ||
final LogicalType logicalType, | ||
final Set<String> fieldNames) { | ||
final int keyIndex = idx + 1; | ||
if (!logicalType.is(LogicalTypeFamily.CHARACTER_STRING)) { | ||
throw new ValidationException( | ||
"The field key at position " | ||
+ keyIndex | ||
+ " must be a STRING/VARCHAR type, but was " | ||
+ logicalType.asSummaryString() | ||
+ "."); | ||
} | ||
Comment on lines
+128
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check if it is not null (or not nullable)? SELECT OBJECT_OF(cast(null as string), 'f0', 'a', 'b', 'Alice'); gives
which in fact satisfies the required
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the feedback! I have addressed the nullability check and added tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how will it behave for the case when arg is nullable however we can not resolve during planning like ?
SELECT OBJECT_OF(class_name, 'f0', 'value') FROM input |
||
final String fieldName = | ||
callContext | ||
.getArgumentValue(idx, String.class) | ||
.orElseThrow( | ||
() -> | ||
new ValidationException( | ||
"Field name at position " | ||
+ keyIndex | ||
+ " must be a non null STRING/VARCHAR type.")); | ||
|
||
if (!fieldNames.add(fieldName)) { | ||
throw new ValidationException( | ||
String.format( | ||
"The field name '%s' at position %d is repeated.", | ||
fieldName, keyIndex)); | ||
} | ||
} | ||
|
||
@Override | ||
public ArgumentCount getArgumentCount() { | ||
return AT_LEAST_ONE_ODD; | ||
} | ||
|
||
@Override | ||
public Optional<List<DataType>> inferInputTypes( | ||
final CallContext callContext, final boolean throwOnFailure) { | ||
final List<DataType> argumentDataTypes = callContext.getArgumentDataTypes(); | ||
|
||
validateClassInput(callContext, argumentDataTypes); | ||
validateKeyArguments(callContext, argumentDataTypes); | ||
|
||
return Optional.of(argumentDataTypes); | ||
} | ||
|
||
@Override | ||
public List<Signature> getExpectedSignatures(final FunctionDefinition definition) { | ||
// OBJECT_OF expects: name, key1, value1, key2, value2, ... | ||
// OBJECT_OF(<name>, [<key>, <value> [, <key>, <value> , ...]] ) | ||
final List<Signature.Argument> arguments = new ArrayList<>(); | ||
|
||
// Class name (required) | ||
arguments.add(Signature.Argument.of("STRING")); | ||
raminqaf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Key-value pairs (optional, repeating) | ||
arguments.add(Signature.Argument.ofVarying("[STRING, ANY]*")); | ||
|
||
return List.of(Signature.of(arguments)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep in mind that Chinese version also should be updated (could be in English for now)