Skip to content

Commit

Permalink
[Improve] Improve Jdbc connector error message when datatype unsuppor…
Browse files Browse the repository at this point in the history
…ted (apache#5864)
  • Loading branch information
Hisoka-X authored Nov 20, 2023
1 parent 5de87c0 commit 69f79af
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.common.exception;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.common.constants.PluginType;

import java.util.HashMap;
Expand All @@ -26,6 +29,8 @@
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.UNSUPPORTED_DATA_TYPE;

/**
Expand All @@ -37,6 +42,8 @@
*/
public class CommonError {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static SeaTunnelRuntimeException unsupportedDataType(
String identifier, String dataType, String field) {
Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -83,4 +90,32 @@ public static SeaTunnelRuntimeException convertToConnectorTypeError(
params.put("field", field);
return new SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE, params);
}

public static SeaTunnelRuntimeException getCatalogTableWithUnsupportedType(
String catalogName, String tableName, Map<String, String> fieldWithDataTypes) {
Map<String, String> params = new HashMap<>();
params.put("catalogName", catalogName);
params.put("tableName", tableName);
try {
params.put("fieldWithDataTypes", OBJECT_MAPPER.writeValueAsString(fieldWithDataTypes));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new SeaTunnelRuntimeException(GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR, params);
}

public static SeaTunnelRuntimeException getCatalogTablesWithUnsupportedType(
String catalogName, Map<String, Map<String, String>> tableUnsupportedTypes) {
Map<String, String> params = new HashMap<>();
params.put("catalogName", catalogName);
try {
params.put(
"tableUnsupportedTypes",
OBJECT_MAPPER.writeValueAsString(tableUnsupportedTypes));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return new SeaTunnelRuntimeException(
GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.common.exception;

/** SeaTunnel connector error code interface, it only should be invoked by {@link CommonError} */
enum CommonErrorCode implements SeaTunnelErrorCode {
/** SeaTunnel connector error code interface */
public enum CommonErrorCode implements SeaTunnelErrorCode {
UNSUPPORTED_DATA_TYPE(
"COMMON-07", "'<identifier>' unsupported data type '<dataType>' of '<field>'"),
CONVERT_TO_SEATUNNEL_TYPE_ERROR(
Expand All @@ -32,7 +32,13 @@ enum CommonErrorCode implements SeaTunnelErrorCode {
"'<connector>' <type> unsupported convert SeaTunnel data type '<dataType>' of '<field>' to connector data type."),
CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE(
"COMMON-19",
"'<identifier>' unsupported convert SeaTunnel data type '<dataType>' of '<field>' to connector data type.");
"'<identifier>' unsupported convert SeaTunnel data type '<dataType>' of '<field>' to connector data type."),
GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR(
"COMMON-20",
"'<catalogName>' table '<tableName>' unsupported get catalog table with field data types '<fieldWithDataTypes>'"),
GET_CATALOG_TABLES_WITH_UNSUPPORTED_TYPE_ERROR(
"COMMON-21",
"'<catalogName>' tables unsupported get catalog table,the corresponding field types in the following tables are not supported: '<tableUnsupportedTypes>'");

private final String code;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

package org.apache.seatunnel.common.exception;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import java.util.HashMap;
import java.util.Map;

/** SeaTunnel global exception, used to tell user more clearly error messages */
public class SeaTunnelRuntimeException extends RuntimeException {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final SeaTunnelErrorCode seaTunnelErrorCode;
private final Map<String, String> params;

Expand Down Expand Up @@ -64,4 +71,21 @@ public SeaTunnelErrorCode getSeaTunnelErrorCode() {
public Map<String, String> getParams() {
return params;
}

public Map<String, String> getParamsValueAsMap(String key) {
try {
return OBJECT_MAPPER.readValue(
params.get(key), new TypeReference<Map<String, String>>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public <T> T getParamsValueAs(String key) {
try {
return OBJECT_MAPPER.readValue(params.get(key), new TypeReference<T>() {});
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;

Expand All @@ -49,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -170,8 +174,23 @@ public CatalogTable getTable(TablePath tablePath)
ResultSet resultSet = ps.executeQuery()) {

TableSchema.Builder builder = TableSchema.builder();
Map<String, String> unsupported = new LinkedHashMap<>();
while (resultSet.next()) {
builder.column(buildColumn(resultSet));
try {
builder.column(buildColumn(resultSet));
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
unsupported.put(
e.getParams().get("field"), e.getParams().get("dataType"));
} else {
throw e;
}
}
}
if (!unsupported.isEmpty()) {
throw CommonError.getCatalogTableWithUnsupportedType(
catalogName, tablePath.getFullName(), unsupported);
}
// add primary key
primaryKey.ifPresent(builder::primaryKey);
Expand All @@ -186,7 +205,8 @@ public CatalogTable getTable(TablePath tablePath)
"",
catalogName);
}

} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;

Expand All @@ -40,6 +43,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -201,7 +205,7 @@ public static CatalogTable getCatalogTable(Connection connection, TablePath tabl
catalogName);
}

public static CatalogTable getCatalogTable(ResultSetMetaData resultSetMetaData)
public static CatalogTable getCatalogTable(ResultSetMetaData resultSetMetaData, String sqlQuery)
throws SQLException {
return getCatalogTable(
resultSetMetaData,
Expand All @@ -212,11 +216,13 @@ public static CatalogTable getCatalogTable(ResultSetMetaData resultSetMetaData)
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
},
sqlQuery);
}

public static CatalogTable getCatalogTable(
ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper) throws SQLException {
ResultSetMetaData metadata, JdbcDialectTypeMapper typeMapper, String sqlQuery)
throws SQLException {
return getCatalogTable(
metadata,
(BiFunction<ResultSetMetaData, Integer, Column>)
Expand All @@ -226,17 +232,32 @@ public static CatalogTable getCatalogTable(
} catch (SQLException e) {
throw new RuntimeException(e);
}
});
},
sqlQuery);
}

public static CatalogTable getCatalogTable(
ResultSetMetaData metadata,
BiFunction<ResultSetMetaData, Integer, Column> columnConverter)
BiFunction<ResultSetMetaData, Integer, Column> columnConverter,
String sqlQuery)
throws SQLException {
TableSchema.Builder schemaBuilder = TableSchema.builder();
Map<String, String> unsupported = new LinkedHashMap<>();
for (int index = 1; index <= metadata.getColumnCount(); index++) {
Column column = columnConverter.apply(metadata, index);
schemaBuilder.column(column);
try {
Column column = columnConverter.apply(metadata, index);
schemaBuilder.column(column);
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
unsupported.put(e.getParams().get("field"), e.getParams().get("dataType"));
} else {
throw e;
}
}
}
if (!unsupported.isEmpty()) {
throw CommonError.getCatalogTableWithUnsupportedType("UNKNOWN", sqlQuery, unsupported);
}
String catalogName = "jdbc_catalog";
return CatalogTable.of(
Expand All @@ -252,7 +273,7 @@ public static CatalogTable getCatalogTable(
Connection connection, String sqlQuery, JdbcDialectTypeMapper typeMapper)
throws SQLException {
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
return getCatalogTable(ps.getMetaData(), typeMapper);
return getCatalogTable(ps.getMetaData(), typeMapper, sqlQuery);
}
}

Expand All @@ -261,7 +282,7 @@ public static CatalogTable getCatalogTable(Connection connection, String sqlQuer
ResultSetMetaData resultSetMetaData;
try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) {
resultSetMetaData = ps.getMetaData();
return getCatalogTable(resultSetMetaData);
return getCatalogTable(resultSetMetaData, sqlQuery);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
Expand Down Expand Up @@ -73,22 +76,40 @@ public static Map<TablePath, JdbcSourceTable> getTables(
log.info("Loading catalog tables for catalog : {}", jdbcCatalog.getClass());

jdbcCatalog.open();
Map<String, Map<String, String>> unsupportedTable = new HashMap<>();
for (JdbcSourceTableConfig tableConfig : tablesConfig) {
CatalogTable catalogTable =
getCatalogTable(tableConfig, jdbcCatalog, jdbcDialect);
TablePath tablePath = catalogTable.getTableId().toTablePath();
JdbcSourceTable jdbcSourceTable =
JdbcSourceTable.builder()
.tablePath(tablePath)
.query(tableConfig.getQuery())
.partitionColumn(tableConfig.getPartitionColumn())
.partitionNumber(tableConfig.getPartitionNumber())
.partitionStart(tableConfig.getPartitionStart())
.partitionEnd(tableConfig.getPartitionEnd())
.catalogTable(catalogTable)
.build();
tables.put(tablePath, jdbcSourceTable);
log.info("Loaded catalog table : {}, {}", tablePath, jdbcSourceTable);
try {
CatalogTable catalogTable =
getCatalogTable(tableConfig, jdbcCatalog, jdbcDialect);
TablePath tablePath = catalogTable.getTableId().toTablePath();
JdbcSourceTable jdbcSourceTable =
JdbcSourceTable.builder()
.tablePath(tablePath)
.query(tableConfig.getQuery())
.partitionColumn(tableConfig.getPartitionColumn())
.partitionNumber(tableConfig.getPartitionNumber())
.partitionStart(tableConfig.getPartitionStart())
.partitionEnd(tableConfig.getPartitionEnd())
.catalogTable(catalogTable)
.build();
tables.put(tablePath, jdbcSourceTable);
log.info("Loaded catalog table : {}, {}", tablePath, jdbcSourceTable);
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(
CommonErrorCode
.GET_CATALOG_TABLE_WITH_UNSUPPORTED_TYPE_ERROR)) {
unsupportedTable.put(
e.getParams().get("tableName"),
e.getParamsValueAsMap("fieldWithDataTypes"));
} else {
throw e;
}
}
}
if (!unsupportedTable.isEmpty()) {
throw CommonError.getCatalogTablesWithUnsupportedType(
jdbcDialect.dialectName(), unsupportedTable);
}
log.info(
"Loaded {} catalog tables for catalog : {}",
Expand Down Expand Up @@ -328,7 +349,7 @@ private static CatalogTable getCatalogTable(
ResultSetMetaData resultSetMetaData =
jdbcDialect.getResultSetMetaData(connection, sqlQuery);
return CatalogUtils.getCatalogTable(
resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper());
resultSetMetaData, jdbcDialect.getJdbcDialectTypeMapper(), sqlQuery);
}

private static Connection getConnection(JdbcConnectionConfig config, JdbcDialect jdbcDialect)
Expand Down
Loading

0 comments on commit 69f79af

Please sign in to comment.