Skip to content

Commit

Permalink
[Optimization-1682][metadata] Optimize setting named tinyInt1isBit fo…
Browse files Browse the repository at this point in the history
…r mysql metadata (DataLinkDC#2472)

Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Nov 1, 2023
1 parent 756d82b commit b75ed15
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,16 @@ public DataStreamSource<String> build(StreamExecutionEnvironment env) {

@Override
public Map<String, String> parseMetaDataConfig() {
boolean tinyInt1isBit = !config.getJdbc().containsKey("tinyInt1isBit")
|| "true".equalsIgnoreCase(config.getJdbc().get("tinyInt1isBit"));
boolean transformedBitIsBoolean = !config.getJdbc().containsKey("transformedBitIsBoolean")
|| "true".equalsIgnoreCase(config.getJdbc().get("transformedBitIsBoolean"));
String url = String.format("jdbc:mysql://%s:%d/", config.getHostname(), config.getPort());
if (tinyInt1isBit && transformedBitIsBoolean) {
url += "?tinyInt1isBit=true";
} else {
url += "?tinyInt1isBit=false";
}
return parseMetaDataSingleConfig(url);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,67 @@ private static Optional<ColumnType> convertTinyint(Column column, DriverConfig d
return Optional.of(ColumnType.INT);
}

@Override
public ColumnType convert(Column column, DriverConfig driverConfig) {
ColumnType columnType = ColumnType.STRING;
if (Asserts.isNull(column)) {
return columnType;
}
Integer length = Asserts.isNull(column.getLength()) ? 0 : column.getLength();
String t = Asserts.isNull(column.getType()) ? "" : column.getType().toLowerCase();
boolean isNullable = !column.isKeyFlag() && column.isNullable();
boolean tinyInt1isBit = Asserts.isNotNullString(driverConfig.getUrl())
&& !driverConfig.getUrl().contains("tinyInt1isBit=false");
if (t.contains("numeric") || t.contains("decimal")) {
columnType = ColumnType.DECIMAL;
} else if (t.contains("bigint")) {
if (isNullable) {
columnType = ColumnType.JAVA_LANG_LONG;
} else {
columnType = ColumnType.LONG;
}
} else if (t.contains("float")) {
if (isNullable) {
columnType = ColumnType.JAVA_LANG_FLOAT;
} else {
columnType = ColumnType.FLOAT;
}
} else if (t.contains("double")) {
if (isNullable) {
columnType = ColumnType.JAVA_LANG_DOUBLE;
} else {
columnType = ColumnType.DOUBLE;
}
} else if (t.contains("boolean")
|| (tinyInt1isBit && t.contains("tinyint") && length.equals(1))
|| t.contains("bit")) {
if (isNullable) {
columnType = ColumnType.JAVA_LANG_BOOLEAN;
} else {
columnType = ColumnType.BOOLEAN;
}
} else if (t.contains("datetime")) {
columnType = ColumnType.TIMESTAMP;
} else if (t.contains("date")) {
columnType = ColumnType.DATE;
} else if (t.contains("timestamp")) {
columnType = ColumnType.TIMESTAMP;
} else if (t.contains("time")) {
columnType = ColumnType.TIME;
} else if (t.contains("char") || t.contains("text")) {
columnType = ColumnType.STRING;
} else if (t.contains("binary") || t.contains("blob")) {
columnType = ColumnType.BYTES;
} else if (t.contains("tinyint") || t.contains("mediumint") || t.contains("smallint") || t.contains("int")) {
if (isNullable) {
columnType = ColumnType.INTEGER;
} else {
columnType = ColumnType.INT;
}
}
return columnType;
}

@Override
public String convertToDB(ColumnType columnType) {
switch (columnType) {
Expand Down

0 comments on commit b75ed15

Please sign in to comment.