Skip to content

Commit

Permalink
Support type conversion for all scalar functions (#5849)
Browse files Browse the repository at this point in the history
Parameter classes supported for type conversion using `PinotDataType`:
- int/Integer
- long/Long
- float/Float
- double/Double
- String
- byte[]

Also handle function name with underscore in `FunctionRegistry`.

Support type conversion for all features using the scalar function:
- Compile time function in `CalciteSqlParser`
- Record transform/filter during ingestion in `InbuiltFunctionEvaluator`
- Transform during query execution in `ScalarTransformFunctionWrapper`

Add `PostAggregationFunction` to handle post-aggregation calculation using the scalar function.
Add `ArithmeticFunctions` for all the arithmetic scalar functions:
- plus
- minus
- times
- divide
- mod
- min
- max
- abs
- ceil
- floor
- exp
- ln
- sqrt
  • Loading branch information
Jackie-Jiang authored Aug 13, 2020
1 parent 0b6ef98 commit 2cfaed3
Show file tree
Hide file tree
Showing 22 changed files with 845 additions and 477 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@


public class FunctionInfo {

private final Method _method;
private final Class<?> _clazz;

public FunctionInfo(Method method, Class<?> clazz) {
super();
this._method = method;
this._clazz = clazz;
method.setAccessible(true);
_method = method;
_clazz = clazz;
}

public Method getMethod() {
Expand All @@ -39,41 +38,4 @@ public Method getMethod() {
public Class<?> getClazz() {
return _clazz;
}

/**
* Check if the Function is applicable to the argumentTypes.
* We can only know the types at runtime, so we can validate if the return type is Object.
* For e.g funcA( funcB('3.14'), columnA)
* We can only know return type of funcB and 3.14 (String.class) but
* we cannot know the type of columnA in advance without knowing the source schema
* @param argumentTypes
* @return
*/
public boolean isApplicable(Class<?>[] argumentTypes) {

Class<?>[] parameterTypes = _method.getParameterTypes();

if (parameterTypes.length != argumentTypes.length) {
return false;
}

for (int i = 0; i < parameterTypes.length; i++) {
Class<?> type = parameterTypes[i];
//
if (!type.isAssignableFrom(argumentTypes[i]) && argumentTypes[i] != Object.class) {
return false;
}
}
return true;
}

/**
* Eventually we will need to convert the input datatypes before invoking the actual method. For now, there is no conversion
*
* @param args
* @return
*/
public Object[] convertTypes(Object[] args) {
return args;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,117 @@
*/
package org.apache.pinot.common.function;

import com.google.common.base.Preconditions;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.pinot.common.utils.PinotDataType;


/**
* A simple code to invoke a method in any class using reflection.
* Eventually this will support annotations on the method but for now its a simple wrapper on any java method
* The {@code FunctionInvoker} is a wrapper on a java method which supports arguments type conversion and method
* invocation via reflection.
*/
public class FunctionInvoker {
private final Method _method;
private final Class<?>[] _parameterClasses;
private final PinotDataType[] _parameterTypes;
private final Object _instance;

private static final Logger LOGGER = LoggerFactory.getLogger(FunctionInvoker.class);
// Don't log more than 10 entries in 5 MINUTES
//TODO:Convert this functionality into a class that can be used in other places
private static long EXCEPTION_LIMIT_DURATION = TimeUnit.MINUTES.toMillis(5);
private static long EXCEPTION_LIMIT_RATE = 10;
private Method _method;
private Object _instance;
private int exceptionCount;
private long lastExceptionTime = 0;
private FunctionInfo _functionInfo;

public FunctionInvoker(FunctionInfo functionInfo)
throws Exception {
_functionInfo = functionInfo;
public FunctionInvoker(FunctionInfo functionInfo) {
_method = functionInfo.getMethod();
_method.setAccessible(true);
Class<?> clazz = functionInfo.getClazz();
Class<?>[] parameterClasses = _method.getParameterTypes();
int numParameters = parameterClasses.length;
_parameterClasses = new Class<?>[numParameters];
_parameterTypes = new PinotDataType[numParameters];
for (int i = 0; i < numParameters; i++) {
Class<?> parameterClass = parameterClasses[i];
_parameterClasses[i] = parameterClass;
_parameterTypes[i] = FunctionUtils.getParameterType(parameterClass);
}
if (Modifier.isStatic(_method.getModifiers())) {
_instance = null;
} else {
Constructor<?> constructor = clazz.getDeclaredConstructor();
constructor.setAccessible(true);
_instance = constructor.newInstance();
Class<?> clazz = functionInfo.getClazz();
try {
Constructor<?> constructor = functionInfo.getClazz().getDeclaredConstructor();
constructor.setAccessible(true);
_instance = constructor.newInstance();
} catch (Exception e) {
throw new IllegalStateException("Caught exception while constructing class: " + clazz, e);
}
}
}

public Class<?>[] getParameterTypes() {
return _method.getParameterTypes();
/**
* Returns the underlying java method.
*/
public Method getMethod() {
return _method;
}

public Class<?> getReturnType() {
return _method.getReturnType();
/**
* Returns the class of the parameters.
*/
public Class<?>[] getParameterClasses() {
return _parameterClasses;
}

public Object process(Object[] args) {
try {
return _method.invoke(_instance, _functionInfo.convertTypes(args));
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
//most likely the exception is in the udf, get the exceptio
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
//some udf's might be configured incorrectly and we dont want to pollute the log
//keep track of the last time an exception was logged and reset the counter if the last exception is more than the EXCEPTION_LIMIT_DURATION
if (Duration.millis(System.currentTimeMillis() - lastExceptionTime).getStandardMinutes()
> EXCEPTION_LIMIT_DURATION) {
exceptionCount = 0;
/**
* Returns the PinotDataType of the parameters for type conversion purpose. Puts {@code null} for the parameter class
* that does not support type conversion.
*/
public PinotDataType[] getParameterTypes() {
return _parameterTypes;
}

/**
* Converts the type of the given arguments to match the parameter classes. Leaves the argument as is if type
* conversion is not needed or supported.
*/
public void convertTypes(Object[] arguments) {
int numParameters = _parameterClasses.length;
Preconditions.checkArgument(arguments.length == numParameters,
"Wrong number of arguments for method: %s, expected: %s, actual: %s", _method, numParameters, arguments.length);
for (int i = 0; i < numParameters; i++) {
// Skip conversion for null
Object argument = arguments[i];
if (argument == null) {
continue;
}
if (exceptionCount < EXCEPTION_LIMIT_RATE) {
exceptionCount = exceptionCount + 1;
LOGGER.error("Exception invoking method:{} with args:{}, exception message: {}", _method.getName(),
Arrays.toString(args), cause.getMessage());
// Skip conversion if argument can be directly assigned
Class<?> parameterClass = _parameterClasses[i];
Class<?> argumentClass = argument.getClass();
if (parameterClass.isAssignableFrom(argumentClass)) {
continue;
}
return null;

PinotDataType parameterType = _parameterTypes[i];
PinotDataType argumentType = FunctionUtils.getArgumentType(argumentClass);
Preconditions.checkArgument(parameterType != null && argumentType != null,
"Cannot convert value from class: %s to class: %s", argumentClass, parameterClass);
arguments[i] = parameterType.convert(argument, argumentType);
}
}

/**
* Returns the class of the result value.
*/
public Class<?> getResultClass() {
return _method.getReturnType();
}

/**
* Invoke the function with the given arguments. The arguments should match the parameter classes. Use
* {@link #convertTypes(Object[])} to convert the argument types if needed before calling this method.
*/
public Object invoke(Object[] arguments) {
try {
return _method.invoke(_instance, arguments);
} catch (Exception e) {
throw new IllegalStateException(
"Caught exception while invoking method: " + _method + " with arguments: " + Arrays.toString(arguments), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.function.annotations.ScalarFunction;
import org.reflections.Reflections;
import org.reflections.scanners.MethodAnnotationsScanner;
Expand Down Expand Up @@ -104,6 +105,6 @@ public static FunctionInfo getFunctionByName(String functionName) {
}

private static String canonicalize(String functionName) {
return functionName.toLowerCase();
return StringUtils.remove(functionName, '_').toLowerCase();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.function;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.PinotDataType;
import org.apache.pinot.spi.data.FieldSpec.DataType;


public class FunctionUtils {
private FunctionUtils() {
}

// Types allowed as the function parameter (in the function signature) for type conversion
private static final Map<Class<?>, PinotDataType> PARAMETER_TYPE_MAP = new HashMap<Class<?>, PinotDataType>() {{
put(int.class, PinotDataType.INTEGER);
put(Integer.class, PinotDataType.INTEGER);
put(long.class, PinotDataType.LONG);
put(Long.class, PinotDataType.LONG);
put(float.class, PinotDataType.FLOAT);
put(Float.class, PinotDataType.FLOAT);
put(double.class, PinotDataType.DOUBLE);
put(Double.class, PinotDataType.DOUBLE);
put(String.class, PinotDataType.STRING);
put(byte[].class, PinotDataType.BYTES);
}};

// Types allowed as the function argument (actual value passed into the function) for type conversion
private static final Map<Class<?>, PinotDataType> ARGUMENT_TYPE_MAP = new HashMap<Class<?>, PinotDataType>() {{
put(Byte.class, PinotDataType.BYTE);
put(Boolean.class, PinotDataType.BOOLEAN);
put(Character.class, PinotDataType.CHARACTER);
put(Short.class, PinotDataType.SHORT);
put(Integer.class, PinotDataType.INTEGER);
put(Long.class, PinotDataType.LONG);
put(Float.class, PinotDataType.FLOAT);
put(Double.class, PinotDataType.DOUBLE);
put(String.class, PinotDataType.STRING);
put(byte[].class, PinotDataType.BYTES);
}};

private static final Map<Class<?>, DataType> DATA_TYPE_MAP = new HashMap<Class<?>, DataType>() {{
put(int.class, DataType.INT);
put(Integer.class, DataType.INT);
put(long.class, DataType.LONG);
put(Long.class, DataType.LONG);
put(float.class, DataType.FLOAT);
put(Float.class, DataType.FLOAT);
put(double.class, DataType.DOUBLE);
put(Double.class, DataType.DOUBLE);
put(String.class, DataType.STRING);
put(byte[].class, DataType.BYTES);
}};

private static final Map<Class<?>, ColumnDataType> COLUMN_DATA_TYPE_MAP = new HashMap<Class<?>, ColumnDataType>() {{
put(int.class, ColumnDataType.INT);
put(Integer.class, ColumnDataType.INT);
put(long.class, ColumnDataType.LONG);
put(Long.class, ColumnDataType.LONG);
put(float.class, ColumnDataType.FLOAT);
put(Float.class, ColumnDataType.FLOAT);
put(double.class, ColumnDataType.DOUBLE);
put(Double.class, ColumnDataType.DOUBLE);
put(String.class, ColumnDataType.STRING);
put(byte[].class, ColumnDataType.BYTES);
}};

/**
* Returns the corresponding PinotDataType for the given parameter class, or {@code null} if there is no one matching.
*/
@Nullable
public static PinotDataType getParameterType(Class<?> clazz) {
return PARAMETER_TYPE_MAP.get(clazz);
}

/**
* Returns the corresponding PinotDataType for the given argument class, or {@code null} if there is no one matching.
*/
@Nullable
public static PinotDataType getArgumentType(Class<?> clazz) {
return ARGUMENT_TYPE_MAP.get(clazz);
}

/**
* Returns the corresponding DataType for the given class, or {@code null} if there is no one matching.
*/
@Nullable
public static DataType getDataType(Class<?> clazz) {
return DATA_TYPE_MAP.get(clazz);
}

/**
* Returns the corresponding ColumnDataType for the given class, or {@code null} if there is no one matching.
*/
@Nullable
public static ColumnDataType getColumnDataType(Class<?> clazz) {
return COLUMN_DATA_TYPE_MAP.get(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,26 @@


/**
* Annotation Class for Scalar Functions
* Methods annotated using the interface are registered in the FunctionsRegistry
* and can be used as UDFs during Querying
* Annotation Class for Scalar Functions.
*
* Methods annotated using the interface are registered in the FunctionsRegistry, and can be used for transform and
* filtering during record ingestion, and transform and post-aggregation during query execution.
*
* NOTE:
* 1. The annotated method must be under the package of name 'org.apache.pinot.*.function.*' to be auto-registered.
* 2. The following parameter types are supported for auto type conversion:
* - int/Integer
* - long/Long
* - float/Float
* - double/Double
* - String
* - byte[]
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface ScalarFunction {
boolean enabled() default true;
String name() default "";

boolean enabled() default true;

String name() default "";
}
Loading

0 comments on commit 2cfaed3

Please sign in to comment.