Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,25 +215,21 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
Preconditions.checkArgument(
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
lType,
rType,
resultIntDigits + resultScale,
DecimalType.MAX_PRECISION));
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
DataTypes.DECIMAL(
Math.max(
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
DataTypes.DECIMAL(
Math.max(
rhsDecimal.getPrecision(),
rhsDecimal.getScale() + getNumericPrecision(lType)),
rhsDecimal.getScale());
mergedType = mergeExactNumericsIntoDecimal((DecimalType) rType, lType);
} else {
throw new IllegalStateException(
String.format("Incompatible types: \"%s\" and \"%s\"", lType, rType));
Expand All @@ -246,6 +242,20 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
}
}

private static DataType mergeExactNumericsIntoDecimal(
DecimalType decimalType, DataType otherType) {
int resultPrecision =
Math.max(
decimalType.getPrecision(),
decimalType.getScale() + getNumericPrecision(otherType));
Preconditions.checkArgument(
resultPrecision <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION));
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
}

@VisibleForTesting
public static int getNumericPrecision(DataType dataType) {
if (dataType.is(DataTypeFamily.EXACT_NUMERIC)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,23 @@ public void testInferWiderType() {
DataTypes.DECIMAL(5, 4), DataTypes.DECIMAL(10, 2)))
.isEqualTo(DataTypes.DECIMAL(12, 4));

// Test overflow decimal conversions
Assertions.assertThatThrownBy(
() ->
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
.isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");

Assertions.assertThatThrownBy(
() ->
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
.isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");

// Test merging with nullability
Assertions.assertThat(
SchemaUtils.inferWiderType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.ZonedTimestampData;
Expand Down Expand Up @@ -61,6 +62,7 @@

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -1216,6 +1218,88 @@ void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkA
assertThat(outputEvents).containsExactlyInAnyOrder(expected);
}

@ParameterizedTest
@EnumSource
void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();
sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

List<Event> events = generateDecimalColumnEvents("default_table_");
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.singletonList(
new RouteDef(
"default_namespace.default_schema.default_table_\\.*",
"default_namespace.default_schema.default_everything_merged",
null,
"Merge all decimal columns with different precision")),
Collections.emptyList(),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);

execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

String[] expected =
Stream.of(
"CreateTableEvent{tableId={}, schema=columns={`id` INT,`name` STRING,`age` INT,`fav_num` TINYINT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId={}, before=[], after=[1, Alice, 17, 1], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=BIGINT}, oldTypeMapping={fav_num=TINYINT}}",
"DataChangeEvent{tableId={}, before=[], after=[2, Alice, 17, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[3, Alice, 17, 3333], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[4, Alice, 17, 44444444], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(19, 0)}, oldTypeMapping={fav_num=BIGINT}}",
"DataChangeEvent{tableId={}, before=[], after=[5, Alice, 17, 555555555555555], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(24, 5)}, oldTypeMapping={fav_num=DECIMAL(19, 0)}}",
"DataChangeEvent{tableId={}, before=[], after=[6, Alice, 17, 66666.66666], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[7, Alice, 17, 77777777.17000], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId={}, typeMapping={fav_num=DECIMAL(38, 19)}, oldTypeMapping={fav_num=DECIMAL(24, 5)}}",
"DataChangeEvent{tableId={}, before=[], after=[8, Alice, 17, 888888888.8888888888888888888], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[101, Zen, 19, 1.0000000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[102, Zen, 19, 22.0000000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[103, Zen, 19, 3333.0000000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[104, Zen, 19, 44444444.0000000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[105, Zen, 19, 555555555555555.0000000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[106, Zen, 19, 66666.6666600000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[107, Zen, 19, 77777777.1700000000000000000], op=INSERT, meta=()}",
"DataChangeEvent{tableId={}, before=[], after=[108, Zen, 19, 888888888.8888888888888888888], op=INSERT, meta=()}")
.map(
s ->
s.replace(
"{}",
"default_namespace.default_schema.default_everything_merged"))
.toArray(String[]::new);

assertThat(outputEvents).containsExactlyInAnyOrder(expected);
}

private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
List<Event> events = new ArrayList<>();

Expand Down Expand Up @@ -1286,6 +1370,83 @@ private List<Event> generateTemporalColumnEvents(String tableNamePrefix) {
return events;
}

private List<Event> generateDecimalColumnEvents(String tableNamePrefix) {
List<Event> events = new ArrayList<>();

// Initialize schemas
List<String> names =
Arrays.asList(
"tiny",
"small",
"vanilla",
"big",
"dec_15_0",
"decimal_10_10",
"decimal_16_2",
"decimal_29_19");

List<DataType> types =
Arrays.asList(
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.DECIMAL(15, 0),
DataTypes.DECIMAL(10, 5),
DataTypes.DECIMAL(16, 2),
DataTypes.DECIMAL(29, 19));

List<Object> values =
Arrays.asList(
(byte) 1,
(short) 22,
3333,
(long) 44444444,
DecimalData.fromBigDecimal(new BigDecimal("555555555555555"), 15, 0),
DecimalData.fromBigDecimal(new BigDecimal("66666.66666"), 10, 5),
DecimalData.fromBigDecimal(new BigDecimal("77777777.17"), 16, 2),
DecimalData.fromBigDecimal(
new BigDecimal("888888888.8888888888888888888"), 29, 19));

List<Schema> schemas =
types.stream()
.map(
temporalColumnType ->
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("age", DataTypes.INT())
.physicalColumn("fav_num", temporalColumnType)
.primaryKey("id")
.build())
.collect(Collectors.toList());

for (int i = 0; i < names.size(); i++) {
TableId generatedTableId =
TableId.tableId(
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
Schema generatedSchema = schemas.get(i);
events.add(new CreateTableEvent(generatedTableId, generatedSchema));
events.add(
DataChangeEvent.insertEvent(
generatedTableId,
generate(generatedSchema, 1 + i, "Alice", 17, values.get(i))));
}

for (int i = 0; i < names.size(); i++) {
TableId generatedTableId =
TableId.tableId(
"default_namespace", "default_schema", tableNamePrefix + names.get(i));
Schema generatedSchema = schemas.get(i);
events.add(
DataChangeEvent.insertEvent(
generatedTableId,
generate(generatedSchema, 101 + i, "Zen", 19, values.get(i))));
}

return events;
}

BinaryRecordData generate(Schema schema, Object... fields) {
return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])))
.generate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.data.DecimalData;
import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.StringData;
Expand All @@ -40,6 +41,7 @@
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypeFamily;
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
Expand Down Expand Up @@ -74,6 +76,7 @@
import javax.annotation.Nullable;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand Down Expand Up @@ -611,14 +614,42 @@ public Object getFieldOrNull(RecordData recordData) {
} else if (originalField instanceof Integer) {
// INT
return ((Integer) originalField).longValue();
} else if (originalField instanceof Long) {
// BIGINT
return originalField;
} else {
return fail(
new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a BIGINT column. "
+ "Currently only TINYINT / SMALLINT / INT can be accepted by a BIGINT column",
+ "Currently only TINYINT / SMALLINT / INT / LONG can be accepted by a BIGINT column",
originalField.getClass())));
}
} else if (destinationType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) destinationType;
BigDecimal decimalValue;
if (originalField instanceof Byte) {
decimalValue = BigDecimal.valueOf(((Byte) originalField).longValue(), 0);
} else if (originalField instanceof Short) {
decimalValue = BigDecimal.valueOf(((Short) originalField).longValue(), 0);
} else if (originalField instanceof Integer) {
decimalValue = BigDecimal.valueOf(((Integer) originalField).longValue(), 0);
} else if (originalField instanceof Long) {
decimalValue = BigDecimal.valueOf((Long) originalField, 0);
} else if (originalField instanceof DecimalData) {
decimalValue = ((DecimalData) originalField).toBigDecimal();
} else {
return fail(
new IllegalArgumentException(
String.format(
"Cannot fit type \"%s\" into a DECIMAL column. "
+ "Currently only BYTE / SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we support merge FLOAT/DOUBLE to DECIMAL like Flink does?
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#casting

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasonable to have this merging path, I will try to implement it.

Copy link
Member Author

@yuxiqian yuxiqian Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it's not possible to determine a "wide enough" Decimal type to store FLOAT / DOUBLE losslessly (since floating point numbers' expression range doesn't have an uplimit -- it could be up to Infinity), so maybe it's a not good idea to let this type conversion happen implicitly.

Explicit CASTing might be better, which was implemented in FLINK-34877 already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me.

originalField.getClass())));
}
return decimalValue != null
? DecimalData.fromBigDecimal(
decimalValue, decimalType.getPrecision(), decimalType.getScale())
: null;
} else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
if (originalField instanceof Float) {
// FLOAT
Expand Down
Loading