Skip to content

Commit daf28dd

Browse files
morozovleonardBang
authored andcommitted
[FLINK-35056][cdc-connector/sqlserver] Fix scale mapping from SQL Server TIMESTAMP to Flink SQL timestamp
This closes #3561. (cherry picked from commit 52f2019)
1 parent 803d438 commit daf28dd

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ private static DataType convertFromColumn(Column column) {
7171
return DataTypes.DATE();
7272
case Types.TIMESTAMP:
7373
case Types.TIMESTAMP_WITH_TIMEZONE:
74-
return column.length() >= 0
75-
? DataTypes.TIMESTAMP(column.length())
74+
return column.scale().isPresent()
75+
? DataTypes.TIMESTAMP(column.scale().get())
7676
: DataTypes.TIMESTAMP();
7777
case Types.BOOLEAN:
7878
return DataTypes.BOOLEAN();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
3636
import org.apache.flink.table.api.DataTypes;
3737
import org.apache.flink.table.types.DataType;
38+
import org.apache.flink.table.types.logical.RowType;
3839

3940
import io.debezium.jdbc.JdbcConnection;
4041
import io.debezium.relational.TableId;
@@ -50,6 +51,8 @@
5051
import java.util.stream.Collectors;
5152

5253
import static org.apache.flink.cdc.connectors.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
54+
import static org.junit.Assert.assertEquals;
55+
import static org.junit.Assert.assertFalse;
5356
import static org.junit.Assert.assertNotNull;
5457
import static org.junit.Assert.assertTrue;
5558
import static org.testcontainers.containers.MSSQLServerContainer.MS_SQL_SERVER_PORT;
@@ -189,6 +192,29 @@ public void testInsertDataInSnapshotScan() throws Exception {
189192
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
190193
}
191194

195+
@Test
196+
public void testDateTimePrimaryKey() throws Exception {
197+
String databaseName = "pk";
198+
String tableName = "dbo.dt_pk";
199+
200+
initializeSqlServerTable(databaseName);
201+
202+
SqlServerSourceConfigFactory sourceConfigFactory =
203+
getConfigFactory(databaseName, new String[] {tableName}, 8096);
204+
SqlServerSourceConfig sourceConfig = sourceConfigFactory.create(0);
205+
SqlServerDialect sqlServerDialect = new SqlServerDialect(sourceConfig);
206+
207+
List<SnapshotSplit> snapshotSplits = getSnapshotSplits(sourceConfig, sqlServerDialect);
208+
assertFalse(snapshotSplits.isEmpty());
209+
210+
RowType expectedType =
211+
(RowType)
212+
DataTypes.ROW(DataTypes.FIELD("dt", DataTypes.TIMESTAMP(3).notNull()))
213+
.getLogicalType();
214+
215+
snapshotSplits.forEach(s -> assertEquals(expectedType, s.getSplitKeyType()));
216+
}
217+
192218
@Test
193219
public void testDeleteDataInSnapshotScan() throws Exception {
194220
String databaseName = "customer";
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
-- Licensed to the Apache Software Foundation (ASF) under one or more
2+
-- contributor license agreements. See the NOTICE file distributed with
3+
-- this work for additional information regarding copyright ownership.
4+
-- The ASF licenses this file to You under the Apache License, Version 2.0
5+
-- (the "License"); you may not use this file except in compliance with
6+
-- the License. You may obtain a copy of the License at
7+
--
8+
-- http://www.apache.org/licenses/LICENSE-2.0
9+
--
10+
-- Unless required by applicable law or agreed to in writing, software
11+
-- distributed under the License is distributed on an "AS IS" BASIS,
12+
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
-- See the License for the specific language governing permissions and
14+
-- limitations under the License.
15+
16+
CREATE DATABASE pk;
17+
18+
USE pk;
19+
EXEC sys.sp_cdc_enable_db;
20+
21+
CREATE TABLE dt_pk (
22+
dt datetime NOT NULL PRIMARY KEY,
23+
val INT
24+
);
25+
26+
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'dt_pk', @role_name = NULL, @supports_net_changes = 0;

0 commit comments

Comments
 (0)