diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java index fcb820cd25..d9546faf52 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/mysql/MysqlCDCBuilder.java @@ -187,7 +187,16 @@ public DataStreamSource build(StreamExecutionEnvironment env) { @Override public Map parseMetaDataConfig() { + boolean tinyInt1isBit = !config.getJdbc().containsKey("tinyInt1isBit") + || "true".equalsIgnoreCase(config.getJdbc().get("tinyInt1isBit")); + boolean transformedBitIsBoolean = !config.getJdbc().containsKey("transformedBitIsBoolean") + || "true".equalsIgnoreCase(config.getJdbc().get("transformedBitIsBoolean")); String url = String.format("jdbc:mysql://%s:%d/", config.getHostname(), config.getPort()); + if (tinyInt1isBit && transformedBitIsBoolean) { + url += "?tinyInt1isBit=true"; + } else { + url += "?tinyInt1isBit=false"; + } return parseMetaDataSingleConfig(url); } diff --git a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/convert/MySqlTypeConvert.java b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/convert/MySqlTypeConvert.java index c02d8c17e1..0eb939e974 100644 --- a/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/convert/MySqlTypeConvert.java +++ b/dinky-metadata/dinky-metadata-mysql/src/main/java/org/dinky/metadata/convert/MySqlTypeConvert.java @@ -78,6 +78,67 @@ private static Optional convertTinyint(Column column, DriverConfig d return Optional.of(ColumnType.INT); } + @Override + public ColumnType convert(Column column, DriverConfig driverConfig) { + ColumnType columnType = ColumnType.STRING; + if (Asserts.isNull(column)) { + return columnType; + } + Integer length = Asserts.isNull(column.getLength()) ? 0 : column.getLength(); + String t = Asserts.isNull(column.getType()) ? "" : column.getType().toLowerCase(); + boolean isNullable = !column.isKeyFlag() && column.isNullable(); + boolean tinyInt1isBit = Asserts.isNotNullString(driverConfig.getUrl()) + && !driverConfig.getUrl().contains("tinyInt1isBit=false"); + if (t.contains("numeric") || t.contains("decimal")) { + columnType = ColumnType.DECIMAL; + } else if (t.contains("bigint")) { + if (isNullable) { + columnType = ColumnType.JAVA_LANG_LONG; + } else { + columnType = ColumnType.LONG; + } + } else if (t.contains("float")) { + if (isNullable) { + columnType = ColumnType.JAVA_LANG_FLOAT; + } else { + columnType = ColumnType.FLOAT; + } + } else if (t.contains("double")) { + if (isNullable) { + columnType = ColumnType.JAVA_LANG_DOUBLE; + } else { + columnType = ColumnType.DOUBLE; + } + } else if (t.contains("boolean") + || (tinyInt1isBit && t.contains("tinyint") && length.equals(1)) + || t.contains("bit")) { + if (isNullable) { + columnType = ColumnType.JAVA_LANG_BOOLEAN; + } else { + columnType = ColumnType.BOOLEAN; + } + } else if (t.contains("datetime")) { + columnType = ColumnType.TIMESTAMP; + } else if (t.contains("date")) { + columnType = ColumnType.DATE; + } else if (t.contains("timestamp")) { + columnType = ColumnType.TIMESTAMP; + } else if (t.contains("time")) { + columnType = ColumnType.TIME; + } else if (t.contains("char") || t.contains("text")) { + columnType = ColumnType.STRING; + } else if (t.contains("binary") || t.contains("blob")) { + columnType = ColumnType.BYTES; + } else if (t.contains("tinyint") || t.contains("mediumint") || t.contains("smallint") || t.contains("int")) { + if (isNullable) { + columnType = ColumnType.INTEGER; + } else { + columnType = ColumnType.INT; + } + } + return columnType; + } + @Override public String convertToDB(ColumnType columnType) { switch (columnType) {