Skip to content

Commit c1153bd

Browse files
JingsongLiKurtYoung
authored andcommitted
[FLINK-12073][table-planner-blink] Support appropriate precision and scale processing of Decimal (apache#8088)
1 parent c5e4acc commit c1153bd

File tree

8 files changed

+1115
-6
lines changed

8 files changed

+1115
-6
lines changed

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@
1818

1919
package org.apache.flink.table.api
2020

21+
import org.apache.flink.table.`type`.TypeConverters.createExternalTypeInfoFromInternalType
2122
import org.apache.flink.table.calcite.FlinkTypeFactory._
2223
import org.apache.flink.table.expressions.Expression
2324
import org.apache.flink.table.functions.TemporalTableFunction
24-
import org.apache.flink.table.`type`.TypeConverters.createInternalTypeInfoFromInternalType
2525

2626
import org.apache.calcite.rel.RelNode
2727

@@ -45,7 +45,7 @@ class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends Table
4545
val fieldNames = rowType.getFieldList.map(_.getName)
4646
val fieldTypes = rowType.getFieldList map { tp =>
4747
val internalType = toInternalType(tp.getType)
48-
createInternalTypeInfoFromInternalType(internalType)
48+
createExternalTypeInfoFromInternalType(internalType)
4949
}
5050
new TableSchema(fieldNames.toArray, fieldTypes.toArray)
5151
}

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.calcite.jdbc.JavaTypeFactoryImpl
2727
import org.apache.calcite.rel.`type`._
2828
import org.apache.calcite.sql.SqlIntervalQualifier
2929
import org.apache.calcite.sql.`type`.SqlTypeName._
30-
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName}
30+
import org.apache.calcite.sql.`type`.{BasicSqlType, SqlTypeName, SqlTypeUtil}
3131
import org.apache.calcite.sql.parser.SqlParserPos
3232
import org.apache.calcite.util.ConversionUtil
3333

@@ -347,6 +347,26 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
347347
override def getDefaultCharset: Charset = {
348348
Charset.forName(ConversionUtil.NATIVE_UTF16_CHARSET_NAME)
349349
}
350+
351+
/**
352+
* Calcite's default impl for division is apparently borrowed from T-SQL,
353+
* but the details are a little different, e.g. when Decimal(34,0)/Decimal(10,0)
354+
* To avoid confusion, follow the exact T-SQL behavior.
355+
* Note that for (+-*), Calcite is also different from T-SQL;
356+
* however, Calcite conforms to SQL2003 while T-SQL does not.
357+
* therefore we keep Calcite's behavior on (+-*).
358+
*/
359+
override def createDecimalQuotient(type1: RelDataType, type2: RelDataType): RelDataType = {
360+
if (SqlTypeUtil.isExactNumeric(type1) && SqlTypeUtil.isExactNumeric(type2) &&
361+
(SqlTypeUtil.isDecimal(type1) || SqlTypeUtil.isDecimal(type2))) {
362+
val result = DecimalType.inferDivisionType(
363+
type1.getPrecision, type1.getScale,
364+
type2.getPrecision, type2.getScale)
365+
createSqlType(SqlTypeName.DECIMAL, result.precision, result.scale)
366+
} else {
367+
null
368+
}
369+
}
350370
}
351371

352372
object FlinkTypeFactory {

flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.flink.table.calcite
2020

21-
import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl
21+
import org.apache.flink.table.`type`.{DecimalType, InternalType, InternalTypes}
22+
import org.apache.flink.table.typeutils.TypeCheckUtils
23+
24+
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeSystemImpl}
2225
import org.apache.calcite.sql.`type`.SqlTypeName
23-
import org.apache.flink.table.`type`.DecimalType
2426

2527
/**
2628
* Custom type system for Flink.
@@ -51,4 +53,43 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
5153
// this fixes the problem of CASE WHEN with different length string literals but get wrong
5254
// result with additional space suffix
5355
override def shouldConvertRaggedUnionTypesToVarying(): Boolean = true
56+
57+
override def deriveAvgAggType(
58+
typeFactory: RelDataTypeFactory, argType: RelDataType): RelDataType = {
59+
val argTypeInfo = FlinkTypeFactory.toInternalType(argType)
60+
val avgType = FlinkTypeSystem.deriveAvgAggType(argTypeInfo)
61+
typeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromInternalType(
62+
avgType, argType.isNullable)
63+
}
64+
65+
override def deriveSumType(
66+
typeFactory: RelDataTypeFactory, argType: RelDataType): RelDataType = {
67+
val argTypeInfo = FlinkTypeFactory.toInternalType(argType)
68+
val sumType = FlinkTypeSystem.deriveSumType(argTypeInfo)
69+
typeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromInternalType(
70+
sumType, argType.isNullable)
71+
}
72+
}
73+
74+
object FlinkTypeSystem {
75+
76+
def deriveAvgAggType(argType: InternalType): InternalType = argType match {
77+
case dt: DecimalType =>
78+
val result = DecimalType.inferAggAvgType(dt.scale())
79+
DecimalType.of(result.precision(), result.scale())
80+
case nt if TypeCheckUtils.isNumeric(nt) => InternalTypes.DOUBLE
81+
case _ =>
82+
throw new RuntimeException("Unsupported argType for AVG(): " + argType)
83+
}
84+
85+
def deriveSumType(argType: InternalType): InternalType = argType match {
86+
case dt: DecimalType =>
87+
val result = DecimalType.inferAggSumType(dt.scale())
88+
DecimalType.of(result.precision(), result.scale())
89+
case nt if TypeCheckUtils.isNumeric(nt) =>
90+
argType
91+
case _ =>
92+
throw new RuntimeException("Unsupported argType for SUM(): " + argType)
93+
}
94+
5495
}

0 commit comments

Comments
 (0)