Skip to content

Commit 4bd3a23

Browse files
authored
[FLINK-36565][transform] Route allows merging Decimals with various precisions
This closes #3651
1 parent 51c679a commit 4bd3a23

File tree

4 files changed

+234
-15
lines changed

4 files changed

+234
-15
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -215,25 +215,21 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
215215
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
216216
rhsDecimal.getPrecision() - rhsDecimal.getScale());
217217
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
218+
Preconditions.checkArgument(
219+
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
220+
String.format(
221+
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
222+
lType,
223+
rType,
224+
resultIntDigits + resultScale,
225+
DecimalType.MAX_PRECISION));
218226
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
219227
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
220228
// Merge decimal and int
221-
DecimalType lhsDecimal = (DecimalType) lType;
222-
mergedType =
223-
DataTypes.DECIMAL(
224-
Math.max(
225-
lhsDecimal.getPrecision(),
226-
lhsDecimal.getScale() + getNumericPrecision(rType)),
227-
lhsDecimal.getScale());
229+
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
228230
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
229231
// Merge decimal and int
230-
DecimalType rhsDecimal = (DecimalType) rType;
231-
mergedType =
232-
DataTypes.DECIMAL(
233-
Math.max(
234-
rhsDecimal.getPrecision(),
235-
rhsDecimal.getScale() + getNumericPrecision(lType)),
236-
rhsDecimal.getScale());
232+
mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType);
237233
} else {
238234
throw new IllegalStateException(
239235
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
@@ -246,6 +242,20 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
246242
}
247243
}
248244

245+
private static DataType mergeExactNumericsIntoDecimal(
246+
DecimalType decimalType, DataType otherType) {
247+
int resultPrecision =
248+
Math.max(
249+
decimalType.getPrecision(),
250+
decimalType.getScale() + getNumericPrecision(otherType));
251+
Preconditions.checkArgument(
252+
resultPrecision <= DecimalType.MAX_PRECISION,
253+
String.format(
254+
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
255+
decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION));
256+
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
257+
}
258+
249259
@VisibleForTesting
250260
public static int getNumericPrecision(DataType dataType) {
251261
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {

flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,23 @@ public void testInferWiderType() {
273273
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)))
274274
.isEqualTo(DataTypes.DECIMAL(12, 4));
275275

276+
// Test overflow decimal conversions
277+
Assertions.assertThatThrownBy(
278+
() ->
279+
SchemaUtils.inferWiderType(
280+
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
281+
.isExactlyInstanceOf(IllegalArgumentException.class)
282+
.hasMessage(
283+
"Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
284+
285+
Assertions.assertThatThrownBy(
286+
() ->
287+
SchemaUtils.inferWiderType(
288+
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
289+
.isExactlyInstanceOf(IllegalArgumentException.class)
290+
.hasMessage(
291+
"Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
292+
276293
// Test merging with nullability
277294
Assertions.assertThat(
278295
SchemaUtils.inferWiderType(

flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.composer.flink;
1919

2020
import org.apache.flink.cdc.common.configuration.Configuration;
21+
import org.apache.flink.cdc.common.data.DecimalData;
2122
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2223
import org.apache.flink.cdc.common.data.TimestampData;
2324
import org.apache.flink.cdc.common.data.ZonedTimestampData;
@@ -61,6 +62,7 @@
6162

6263
import java.io.ByteArrayOutputStream;
6364
import java.io.PrintStream;
65+
import java.math.BigDecimal;
6466
import java.time.Instant;
6567
import java.time.LocalDateTime;
6668
import java.time.ZoneId;
@@ -1216,6 +1218,88 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA
12161218
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
12171219
}
12181220

1221+
@ParameterizedTest
1222+
@EnumSource
1223+
void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) throws Exception {
1224+
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
1225+
1226+
// Setup value source
1227+
Configuration sourceConfig = new Configuration();
1228+
sourceConfig.set(
1229+
ValuesDataSourceOptions.EVENT_SET_ID,
1230+
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
1231+
1232+
List<Event> events = generateDecimalColumnEvents("default_table_");
1233+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
1234+
1235+
SourceDef sourceDef =
1236+
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);
1237+
1238+
// Setup value sink
1239+
Configuration sinkConfig = new Configuration();
1240+
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
1241+
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);
1242+
1243+
// Setup pipeline
1244+
Configuration pipelineConfig = new Configuration();
1245+
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
1246+
pipelineConfig.set(
1247+
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
1248+
PipelineDef pipelineDef =
1249+
new PipelineDef(
1250+
sourceDef,
1251+
sinkDef,
1252+
Collections.singletonList(
1253+
new RouteDef(
1254+
"default_namespace.default_schema.default_table_\\.*",
1255+
"default_namespace.default_schema.default_everything_merged",
1256+
null,
1257+
"Merge all decimal columns with different precision")),
1258+
Collections.emptyList(),
1259+
Collections.emptyList(),
1260+
pipelineConfig);
1261+
1262+
// Execute the pipeline
1263+
PipelineExecution execution = composer.compose(pipelineDef);
1264+
1265+
execution.execute();
1266+
1267+
// Check the order and content of all received events
1268+
String[] outputEvents = outCaptor.toString().trim().split("\n");
1269+
1270+
String[] expected =
1271+
Stream.of(
1272+
"CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}",
1273+
"DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}",
1274+
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=TINYINT}}",
1275+
"DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}",
1276+
"DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
1277+
"DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
1278+
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}",
1279+
"DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
1280+
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}",
1281+
"DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
1282+
"DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
1283+
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}",
1284+
"DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
1285+
"DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
1286+
"DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
1287+
"DataChangeEvent{tableId={}, before=[], after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
1288+
"DataChangeEvent{tableId={}, before=[], after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
1289+
"DataChangeEvent{tableId={}, before=[], after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
1290+
"DataChangeEvent{tableId={}, before=[], after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
1291+
"DataChangeEvent{tableId={}, before=[], after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
1292+
"DataChangeEvent{tableId={}, before=[], after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
1293+
.map(
1294+
s ->
1295+
s.replace(
1296+
"{}",
1297+
"default_namespace.default_schema.default_everything_merged"))
1298+
.toArray(String[]::new);
1299+
1300+
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
1301+
}
1302+
12191303
private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
12201304
List<Event> events = new ArrayList<>();
12211305

@@ -1286,6 +1370,83 @@ private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
12861370
return events;
12871371
}
12881372

1373+
private List<Event> generateDecimalColumnEvents(String tableNamePrefix) {
1374+
List<Event> events = new ArrayList<>();
1375+
1376+
// Initialize schemas
1377+
List<String> names =
1378+
Arrays.asList(
1379+
"tiny",
1380+
"small",
1381+
"vanilla",
1382+
"big",
1383+
"dec_15_0",
1384+
"decimal_10_10",
1385+
"decimal_16_2",
1386+
"decimal_29_19");
1387+
1388+
List<DataType> types =
1389+
Arrays.asList(
1390+
DataTypes.TINYINT(),
1391+
DataTypes.SMALLINT(),
1392+
DataTypes.INT(),
1393+
DataTypes.BIGINT(),
1394+
DataTypes.DECIMAL(15, 0),
1395+
DataTypes.DECIMAL(10, 5),
1396+
DataTypes.DECIMAL(16, 2),
1397+
DataTypes.DECIMAL(29, 19));
1398+
1399+
List<Object> values =
1400+
Arrays.asList(
1401+
(byte) 1,
1402+
(short) 22,
1403+
3333,
1404+
(long) 44444444,
1405+
DecimalData.fromBigDecimal(new BigDecimal("555555555555555"), 15, 0),
1406+
DecimalData.fromBigDecimal(new BigDecimal("66666.66666"), 10, 5),
1407+
DecimalData.fromBigDecimal(new BigDecimal("77777777.17"), 16, 2),
1408+
DecimalData.fromBigDecimal(
1409+
new BigDecimal("888888888.8888888888888888888"), 29, 19));
1410+
1411+
List<Schema> schemas =
1412+
types.stream()
1413+
.map(
1414+
temporalColumnType ->
1415+
Schema.newBuilder()
1416+
.physicalColumn("id", DataTypes.INT())
1417+
.physicalColumn("name", DataTypes.STRING())
1418+
.physicalColumn("age", DataTypes.INT())
1419+
.physicalColumn("fav_num", temporalColumnType)
1420+
.primaryKey("id")
1421+
.build())
1422+
.collect(Collectors.toList());
1423+
1424+
for (int i = 0; i < names.size(); i++) {
1425+
TableId generatedTableId =
1426+
TableId.tableId(
1427+
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
1428+
Schema generatedSchema = schemas.get(i);
1429+
events.add(new CreateTableEvent(generatedTableId, generatedSchema));
1430+
events.add(
1431+
DataChangeEvent.insertEvent(
1432+
generatedTableId,
1433+
generate(generatedSchema, 1 + i, "Alice", 17, values.get(i))));
1434+
}
1435+
1436+
for (int i = 0; i < names.size(); i++) {
1437+
TableId generatedTableId =
1438+
TableId.tableId(
1439+
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
1440+
Schema generatedSchema = schemas.get(i);
1441+
events.add(
1442+
DataChangeEvent.insertEvent(
1443+
generatedTableId,
1444+
generate(generatedSchema, 101 + i, "Zen", 19, values.get(i))));
1445+
}
1446+
1447+
return events;
1448+
}
1449+
12891450
BinaryRecordData generate(Schema schema, Object... fields) {
12901451
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
12911452
.generate(

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.java.tuple.Tuple3;
2121
import org.apache.flink.cdc.common.annotation.Internal;
2222
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
23+
import org.apache.flink.cdc.common.data.DecimalData;
2324
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
2425
import org.apache.flink.cdc.common.data.RecordData;
2526
import org.apache.flink.cdc.common.data.StringData;
@@ -40,6 +41,7 @@
4041
import org.apache.flink.cdc.common.types.DataType;
4142
import org.apache.flink.cdc.common.types.DataTypeFamily;
4243
import org.apache.flink.cdc.common.types.DataTypeRoot;
44+
import org.apache.flink.cdc.common.types.DecimalType;
4345
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
4446
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
4547
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
@@ -74,6 +76,7 @@
7476
import javax.annotation.Nullable;
7577

7678
import java.io.Serializable;
79+
import java.math.BigDecimal;
7780
import java.time.Duration;
7881
import java.time.LocalDateTime;
7982
import java.time.ZoneId;
@@ -611,14 +614,42 @@ public Object getFieldOrNull(RecordData recordData) {
611614
} else if (originalField instanceof Integer) {
612615
// INT
613616
return ((Integer) originalField).longValue();
617+
} else if (originalField instanceof Long) {
618+
// BIGINT
619+
return originalField;
614620
} else {
615621
return fail(
616622
new IllegalArgumentException(
617623
String.format(
618624
"Cannot fit type \"%s\" into a BIGINT column. "
619-
+ "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column",
625+
+ "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column",
626+
originalField.getClass())));
627+
}
628+
} else if (destinationType instanceof DecimalType) {
629+
DecimalType decimalType = (DecimalType) destinationType;
630+
BigDecimal decimalValue;
631+
if (originalField instanceof Byte) {
632+
decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0);
633+
} else if (originalField instanceof Short) {
634+
decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0);
635+
} else if (originalField instanceof Integer) {
636+
decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0);
637+
} else if (originalField instanceof Long) {
638+
decimalValue = BigDecimal.valueOf((Long) originalField, 0);
639+
} else if (originalField instanceof DecimalData) {
640+
decimalValue = ((DecimalData) originalField).toBigDecimal();
641+
} else {
642+
return fail(
643+
new IllegalArgumentException(
644+
String.format(
645+
"Cannot fit type \"%s\" into a DECIMAL column. "
646+
+ "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
620647
originalField.getClass())));
621648
}
649+
return decimalValue != null
650+
? DecimalData.fromBigDecimal(
651+
decimalValue, decimalType.getPrecision(), decimalType.getScale())
652+
: null;
622653
} else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
623654
if (originalField instanceof Float) {
624655
// FLOAT

0 commit comments

Comments
 (0)