diff --git a/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchColumnException.java b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchColumnException.java new file mode 100644 index 00000000000..c149b44026f --- /dev/null +++ b/api/src/main/java/com/datastrato/gravitino/exceptions/NoSuchColumnException.java @@ -0,0 +1,17 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.exceptions; + +/** Exception thrown when a column with specified name is not existed. */ +public class NoSuchColumnException extends NotFoundException { + + public NoSuchColumnException(String message) { + super(message); + } + + public NoSuchColumnException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcColumn.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcColumn.java index 89d84e9fba5..fced85a324f 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcColumn.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcColumn.java @@ -5,17 +5,24 @@ package com.datastrato.gravitino.catalog.jdbc; import com.datastrato.gravitino.catalog.rel.BaseColumn; +import java.util.List; /** Represents a column in the Jdbc column. */ public class JdbcColumn extends BaseColumn { private String defaultValue; + private List properties; + private JdbcColumn() {} public String getDefaultValue() { return defaultValue; } + public List getProperties() { + return properties; + } + /** A builder class for constructing JdbcColumn instances. */ public static class Builder extends BaseColumnBuilder { @@ -24,11 +31,19 @@ public static class Builder extends BaseColumnBuilder { */ private String defaultValue; + /** Attribute value of the field, such as AUTO_INCREMENT, PRIMARY KEY, etc. */ + private List properties; + public Builder withDefaultValue(String defaultValue) { this.defaultValue = defaultValue; return this; } + public Builder withProperties(List properties) { + this.properties = properties; + return this; + } + /** * Internal method to build a JdbcColumn instance using the provided values. * @@ -42,6 +57,7 @@ protected JdbcColumn internalBuild() { jdbcColumn.dataType = dataType; jdbcColumn.nullable = nullable; jdbcColumn.defaultValue = defaultValue; + jdbcColumn.properties = properties; return jdbcColumn; } } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java index 3021807649a..97b6f61d0aa 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java @@ -10,7 +10,11 @@ import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import javax.sql.DataSource; import org.slf4j.Logger; @@ -54,12 +58,28 @@ public void delete(String databaseName, boolean cascade) throws NoSuchSchemaExce } } + @Override + public List list() { + List tableNames = new ArrayList<>(); + try (final Connection connection = this.dataSource.getConnection()) { + DatabaseMetaData metaData = connection.getMetaData(); + ResultSet resultSet = metaData.getTables(null, null, null, JdbcConnectorUtils.TABLE_TYPES); + while (resultSet.next()) { + String tableName = resultSet.getString("TABLE_NAME"); + tableNames.add(tableName); + } + return tableNames; + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + /** * @param databaseName The name of the database. * @param comment The comment of the database. * @return the SQL statement to create a database with the given name and comment. */ - abstract String generateCreateDatabaseSql( + public abstract String generateCreateDatabaseSql( String databaseName, String comment, Map properties); /** @@ -67,5 +87,5 @@ abstract String generateCreateDatabaseSql( * @param cascade cascade If set to true, drops all the tables in the schema as well. * @return the SQL statement to drop a database with the given name. */ - abstract String generateDropDatabaseSql(String databaseName, boolean cascade); + public abstract String generateDropDatabaseSql(String databaseName, boolean cascade); } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java index d7db148bedf..46f01b82f5a 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java @@ -2,7 +2,6 @@ * Copyright 2023 Datastrato. * This software is licensed under the Apache License version 2. */ - package com.datastrato.gravitino.catalog.jdbc.operation; import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; @@ -32,8 +31,6 @@ public abstract class JdbcTableOperations implements TableOperation { protected static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class); - protected static final String[] TABLE_TYPES = {"TABLE"}; - protected DataSource dataSource; protected JdbcExceptionConverter exceptionMapper; protected JdbcTypeConverter typeConverter; @@ -111,14 +108,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE try (ResultSet column = getColumns(connection, tableName)) { List result = new ArrayList<>(); while (column.next()) { - result.add( - new JdbcColumn.Builder() - .withName(column.getString("COLUMN_NAME")) - .withComment(column.getString("REMARKS")) - .withType(typeConverter.toGravitinoType(column.getString("TYPE_NAME"))) - .withNullable(column.getInt("NULLABLE") == DatabaseMetaData.columnNullable) - .withDefaultValue(column.getString("COLUMN_DEF")) - .build()); + result.add(extractJdbcColumnFromResultSet(column)); } jdbcColumns = result.toArray(new JdbcColumn[0]); } @@ -157,7 +147,8 @@ public void alterTable(String databaseName, String tableName, TableChange... cha throws NoSuchTableException { LOG.info("Attempting to alter table {} from database {}", tableName, databaseName); try (Connection connection = getConnection(databaseName)) { - JdbcConnectorUtils.executeUpdate(connection, generateAlterTableSql(tableName, changes)); + JdbcConnectorUtils.executeUpdate( + connection, generateAlterTableSql(databaseName, tableName, changes)); LOG.info("Alter table {} from database {}", tableName, databaseName); } catch (final SQLException se) { throw this.exceptionMapper.toGravitinoException(se); @@ -178,13 +169,14 @@ public void purge(String databaseName, String tableName) throws NoSuchTableExcep protected ResultSet getTables(Connection connection) throws SQLException { final DatabaseMetaData metaData = connection.getMetaData(); String databaseName = connection.getSchema(); - return metaData.getTables(databaseName, databaseName, null, TABLE_TYPES); + return metaData.getTables(databaseName, databaseName, null, JdbcConnectorUtils.TABLE_TYPES); } protected ResultSet getTable(Connection connection, String tableName) throws SQLException { final DatabaseMetaData metaData = connection.getMetaData(); String databaseName = connection.getSchema(); - return metaData.getTables(databaseName, databaseName, tableName, TABLE_TYPES); + return metaData.getTables( + databaseName, databaseName, tableName, JdbcConnectorUtils.TABLE_TYPES); } protected ResultSet getColumns(Connection connection, String tableName) throws SQLException { @@ -199,6 +191,9 @@ protected ResultSet getColumns(Connection connection, String tableName) throws S */ protected abstract Map extractPropertiesFromResultSet(ResultSet table); + protected abstract JdbcColumn extractJdbcColumnFromResultSet(ResultSet column) + throws SQLException; + protected abstract String generateCreateTableSql( String tableName, JdbcColumn[] columns, @@ -212,7 +207,8 @@ protected abstract String generateCreateTableSql( protected abstract String generatePurgeTableSql(String tableName); - protected abstract String generateAlterTableSql(String tableName, TableChange... changes); + protected abstract String generateAlterTableSql( + String databaseName, String tableName, TableChange... changes); protected Connection getConnection(String schema) throws SQLException { Connection connection = dataSource.getConnection(); diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java index 5ece0e3bf01..8f232eeb2a5 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java @@ -10,6 +10,8 @@ public final class JdbcConnectorUtils { + public static final String[] TABLE_TYPES = {"TABLE"}; + private JdbcConnectorUtils() {} /** diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java index df8fdd9510d..337e47db577 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java @@ -81,13 +81,13 @@ public void delete(String databaseName) { } @Override - String generateCreateDatabaseSql( + public String generateCreateDatabaseSql( String databaseName, String comment, Map properties) { return null; } @Override - String generateDropDatabaseSql(String databaseName, boolean cascade) { + public String generateDropDatabaseSql(String databaseName, boolean cascade) { return null; } } diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java index 89b1f8bef35..4be8cfad0c4 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteTableOperations.java @@ -8,6 +8,7 @@ import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Collections; import java.util.Map; @@ -68,10 +69,22 @@ protected String generatePurgeTableSql(String tableName) { } @Override - protected String generateAlterTableSql(String tableName, TableChange... changes) { + protected String generateAlterTableSql( + String databaseName, String tableName, TableChange... changes) { throw new UnsupportedOperationException("Alter table is not supported in sqlite."); } + @Override + protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) throws SQLException { + return new JdbcColumn.Builder() + .withName(resultSet.getString("COLUMN_NAME")) + .withComment(null) + .withType(typeConverter.toGravitinoType(resultSet.getString("TYPE_NAME"))) + .withNullable(resultSet.getBoolean("NULLABLE")) + .withDefaultValue(resultSet.getString("COLUMN_DEF")) + .build(); + } + @Override protected Map extractPropertiesFromResultSet(ResultSet table) { // Sqlite does not support table properties. diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java index c048e957412..50b3ae7a034 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java @@ -129,7 +129,7 @@ public void testOperationTable() { JdbcTable loadTable = JDBC_TABLE_OPERATIONS.load(DATABASE_NAME, table1); Assertions.assertNotNull(loadTable); Assertions.assertEquals(table1, loadTable.name()); - Assertions.assertEquals(null, loadTable.comment()); + Assertions.assertNull(loadTable.comment()); Assertions.assertEquals(properties, loadTable.properties()); Assertions.assertEquals(jdbcColumns.length, loadTable.columns().length); Map createColumnMap = @@ -140,7 +140,7 @@ public void testOperationTable() { Assertions.assertEquals(jdbcColumn.name(), column.name()); Assertions.assertEquals(jdbcColumn.comment(), column.comment()); Assertions.assertEquals(jdbcColumn.dataType(), column.dataType()); - Assertions.assertEquals(jdbcColumn.nullable(), ((JdbcColumn) column).nullable()); + Assertions.assertEquals(jdbcColumn.nullable(), column.nullable()); Assertions.assertEquals( jdbcColumn.getDefaultValue(), ((JdbcColumn) column).getDefaultValue()); } diff --git a/catalogs/catalog-jdbc-mysql/build.gradle.kts b/catalogs/catalog-jdbc-mysql/build.gradle.kts new file mode 100644 index 00000000000..c9c2b691f25 --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/build.gradle.kts @@ -0,0 +1,31 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +description = "catalog-jdbc-mysql" + +plugins { + `maven-publish` + id("java") + id("idea") + id("com.diffplug.spotless") +} + +dependencies { + implementation(project(":common")) + implementation(project(":core")) + implementation(project(":api")) + implementation(project(":catalogs:catalog-jdbc-common")) + implementation(libs.bundles.log4j) + implementation(libs.commons.lang3) + implementation(libs.commons.collections4) + implementation(libs.jsqlparser) + implementation(libs.substrait.java.core) { + exclude("com.fasterxml.jackson.core") + exclude("com.fasterxml.jackson.datatype") + exclude("com.fasterxml.jackson.dataformat") + exclude("com.google.protobuf") + exclude("com.google.code.findbugs") + exclude("org.slf4j") + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlCatalog.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlCatalog.java new file mode 100644 index 00000000000..ec220f1b112 --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/MysqlCatalog.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.mysql; + +import com.datastrato.gravitino.catalog.jdbc.JdbcCatalog; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import com.datastrato.gravitino.catalog.mysql.converter.MysqlExceptionConverter; +import com.datastrato.gravitino.catalog.mysql.converter.MysqlTypeConverter; +import com.datastrato.gravitino.catalog.mysql.operation.MysqlDatabaseOperations; +import com.datastrato.gravitino.catalog.mysql.operation.MysqlTableOperations; + +/** Implementation of an Jdbc catalog in Gravitino. */ +public abstract class MysqlCatalog extends JdbcCatalog { + + @Override + public String shortName() { + return "mysql"; + } + + @Override + protected JdbcExceptionConverter createExceptionConverter() { + return new MysqlExceptionConverter(); + } + + @Override + protected JdbcTypeConverter createJdbcTypeConverter() { + return new MysqlTypeConverter(); + } + + @Override + protected JdbcDatabaseOperations createJdbcDatabaseOperations() { + return new MysqlDatabaseOperations(); + } + + @Override + protected JdbcTableOperations createJdbcTableOperations() { + return new MysqlTableOperations(); + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlExceptionConverter.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlExceptionConverter.java new file mode 100644 index 00000000000..ca13bee0571 --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlExceptionConverter.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.mysql.converter; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import java.sql.SQLException; + +/** Exception converter to gravitino exception for MySQL. */ +public class MysqlExceptionConverter extends JdbcExceptionConverter { + + @Override + public GravitinoRuntimeException toGravitinoException(SQLException se) { + switch (se.getErrorCode()) { + case 1007: + return new SchemaAlreadyExistsException(se.getMessage(), se); + case 1050: + return new TableAlreadyExistsException(se.getMessage(), se); + case 1008: + return new NoSuchSchemaException(se.getMessage(), se); + case 1146: + case 1051: + return new NoSuchTableException(se.getMessage(), se); + default: + return new GravitinoRuntimeException(se.getMessage(), se); + } + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlTypeConverter.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlTypeConverter.java new file mode 100644 index 00000000000..6526bd8d8d1 --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/converter/MysqlTypeConverter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.mysql.converter; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import io.substrait.type.Type; +import io.substrait.type.TypeCreator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** Type converter for MySQL. */ +public class MysqlTypeConverter extends JdbcTypeConverter { + + private static final Pattern TYPE_PATTERN = + Pattern.compile( + "^\\s*?" + + "(\\w+(?:\\s(?:precision|varying))?)" // group 0 + + "\\s*?" + + "(?:\\(\\s*?(\\d+)(?:\\s*?,\\s*?(\\d+))?\\s*?\\))?" // group 1 and 2 + + "\\s*?" + + "(\\[\\](?:\\[\\])?)?" // group 3 + + "(?:\\s*?(\\w+(?:\\s\\w+)*))?$" // group 4 + ); + + @Override + public Type toGravitinoType(String type) { + final String lowerType = type.toLowerCase(); + + // Split up the possible type: TYPE[(size, magnitude)] EXTRA + final String[] splitType = this.splitType(lowerType); + switch (splitType[0]) { + // TODO Waiting for mchades to implement our type system. + case "int": + case "integer": + return TypeCreator.NULLABLE.I32; + case "varchar": + return TypeCreator.NULLABLE.STRING; + default: + throw new UnsupportedOperationException("Not a supported type: " + type); + } + } + + @Override + public String fromGravitinoType(Type type) { + if (type instanceof io.substrait.type.Type.Bool) { + return "boolean"; + } else if (type instanceof io.substrait.type.Type.I8) { + return "int(8)"; + } else if (type instanceof io.substrait.type.Type.I16) { + return "int(16)"; + } else if (type instanceof io.substrait.type.Type.I32) { + return "int(32)"; + } else if (type instanceof io.substrait.type.Type.I64) { + return "BIGINT"; + } else if (type instanceof io.substrait.type.Type.FP32) { + return "FLOAT"; + } else if (type instanceof io.substrait.type.Type.FP64) { + return "DOUBLE"; + } else if (type instanceof io.substrait.type.Type.Str + || type instanceof io.substrait.type.Type.VarChar) { + return "VARCHAR(255)"; + } else if (type instanceof io.substrait.type.Type.Date) { + return "DATE"; + } else if (type instanceof io.substrait.type.Type.Time) { + return "Time"; + } else if (type instanceof io.substrait.type.Type.Timestamp) { + return "TIMESTAMP"; + } else if (type instanceof io.substrait.type.Type.Decimal) { + return "DECIMAL(" + + ((io.substrait.type.Type.Decimal) type).precision() + + "," + + ((io.substrait.type.Type.Decimal) type).scale() + + ")"; + } else if (type instanceof io.substrait.type.Type.FixedChar) { + return "char(" + ((io.substrait.type.Type.FixedChar) type).length() + ")"; + } else if (type instanceof io.substrait.type.Type.Binary) { + return "binary"; + } + throw new UnsupportedOperationException("Not a supported type: " + type.toString()); + } + + protected String[] splitType(final String type) { + final Matcher matcher = TYPE_PATTERN.matcher(type); + final int numGroups = matcher.groupCount(); + if (matcher.find()) { + final String[] split = new String[numGroups]; + for (int i = 0; i < numGroups; i++) { + split[i] = matcher.group(i + 1); + } + return split; + } else { + throw new IllegalArgumentException("Unable to parse " + type); + } + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlDatabaseOperations.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlDatabaseOperations.java new file mode 100644 index 00000000000..f39203b99f9 --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlDatabaseOperations.java @@ -0,0 +1,85 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.mysql.operation; + +import com.datastrato.gravitino.catalog.jdbc.JdbcSchema; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; + +/** Database operations for MySQL. */ +public class MysqlDatabaseOperations extends JdbcDatabaseOperations { + @Override + public String generateCreateDatabaseSql( + String databaseName, String comment, Map properties) { + StringBuilder sqlBuilder = new StringBuilder("CREATE DATABASE "); + + // Append database name + sqlBuilder.append("`").append(databaseName).append("`"); + if (StringUtils.isNotEmpty(comment)) { + throw new UnsupportedOperationException( + "mysql does not support comment option on database create."); + } + // Append options + if (MapUtils.isNotEmpty(properties)) { + sqlBuilder.append("\n"); + sqlBuilder.append( + properties.entrySet().stream() + .map(entry -> entry.getKey() + " " + entry.getValue()) + .collect(Collectors.joining("\n"))); + } + return sqlBuilder.toString(); + } + + @Override + public String generateDropDatabaseSql(String databaseName, boolean cascade) { + if (cascade) { + throw new UnsupportedOperationException( + "MySQL does not support CASCADE option for DROP DATABASE."); + } + return "DROP DATABASE `" + databaseName + "`"; + } + + @Override + public JdbcSchema load(String databaseName) throws NoSuchSchemaException { + try (final Connection connection = this.dataSource.getConnection()) { + String query = "SELECT * FROM information_schema.SCHEMATA WHERE SCHEMA_NAME = ?"; + try (PreparedStatement preparedStatement = connection.prepareStatement(query)) { + preparedStatement.setString(1, databaseName); + + // Execute the query + try (ResultSet resultSet = preparedStatement.executeQuery()) { + if (!resultSet.next()) { + throw new NoSuchSchemaException("Database not found: information_schema.SCHEMATA"); + } + String schemaName = resultSet.getString("SCHEMA_NAME"); + // Mysql currently only supports these two attributes + String characterSetName = resultSet.getString("DEFAULT_CHARACTER_SET_NAME"); + String collationName = resultSet.getString("DEFAULT_COLLATION_NAME"); + return new JdbcSchema.Builder() + .withName(schemaName) + .withProperties( + new HashMap() { + { + put("CHARACTER SET", characterSetName); + put("COLLATE", collationName); + } + }) + .build(); + } + } + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } +} diff --git a/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java new file mode 100644 index 00000000000..374c5c9a1ca --- /dev/null +++ b/catalogs/catalog-jdbc-mysql/src/main/java/com/datastrato/gravitino/catalog/mysql/operation/MysqlTableOperations.java @@ -0,0 +1,547 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.mysql.operation; + +import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; +import com.datastrato.gravitino.catalog.jdbc.JdbcTable; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.datastrato.gravitino.exceptions.NoSuchColumnException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.rel.TableChange; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.StringJoiner; +import java.util.stream.Collectors; +import net.sf.jsqlparser.JSQLParserException; +import net.sf.jsqlparser.parser.CCJSqlParserUtil; +import net.sf.jsqlparser.statement.create.table.ColumnDefinition; +import net.sf.jsqlparser.statement.create.table.CreateTable; +import net.sf.jsqlparser.statement.create.table.Index; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; + +/** Table operations for MySQL. */ +public class MysqlTableOperations extends JdbcTableOperations { + + public static final String AUTO_INCREMENT = "AUTO_INCREMENT"; + public static final String PRIMARY_KEY = "PRIMARY KEY"; + + private static final String COMMENT = "COMMENT"; + private static final String SPACE = " "; + + private static final String NOT_NULL = "NOT NULL"; + private static final String DEFAULT = "DEFAULT"; + + @Override + public JdbcTable load(String databaseName, String tableName) throws NoSuchTableException { + CreateTable createTable = loadCreateTable(databaseName, tableName); + List jdbcColumns = new ArrayList<>(); + // Assemble index information. + Map> indexGroupByName = + getIndexNameGroupByColumnName(createTable.getIndexes()); + for (ColumnDefinition columnDefinition : createTable.getColumnDefinitions()) { + // Assemble column information. + String columnName = columnDefinition.getColumnName().replaceAll("`", ""); + String columnType = columnDefinition.getColDataType().toString(); + String[] columnSpecs = columnDefinition.getColumnSpecs().toArray(new String[0]); + String columnProperties = String.join(SPACE, columnSpecs); + boolean nullable = !columnProperties.contains(NOT_NULL); + String defaultValue = findPropertiesValue(columnSpecs, DEFAULT); + String comment = findPropertiesValue(columnSpecs, COMMENT); + List properties = getColumnProperties(columnProperties); + Optional.ofNullable(indexGroupByName.get(columnName)).ifPresent(properties::addAll); + jdbcColumns.add( + new JdbcColumn.Builder() + .withName(columnName) + .withType(typeConverter.toGravitinoType(columnType)) + .withNullable(nullable) + .withComment(comment) + .withDefaultValue("NULL".equals(defaultValue) ? null : defaultValue) + .withProperties(properties) + .build()); + } + Map properties = + parseOrderedKeyValuePairs(createTable.getTableOptionsStrings().toArray(new String[0])); + + String remove = properties.remove(COMMENT); + return new JdbcTable.Builder() + .withName(tableName) + .withColumns(jdbcColumns.toArray(new JdbcColumn[0])) + .withComment(remove) + .withProperties(properties) + .build(); + } + + private JdbcColumn getJdbcColumnFromCreateTable(CreateTable createTable, String colName) { + // Assemble index information. + Map> indexGroupByName = + getIndexNameGroupByColumnName(createTable.getIndexes()); + for (ColumnDefinition columnDefinition : createTable.getColumnDefinitions()) { + // Assemble column information. + String columnName = columnDefinition.getColumnName().replaceAll("`", ""); + if (!StringUtils.equals(colName, columnName)) { + continue; + } + String columnType = columnDefinition.getColDataType().toString(); + String[] columnSpecs = columnDefinition.getColumnSpecs().toArray(new String[0]); + String columnProperties = String.join(SPACE, columnSpecs); + boolean nullable = !columnProperties.contains(NOT_NULL); + String defaultValue = findPropertiesValue(columnSpecs, DEFAULT); + String comment = findPropertiesValue(columnSpecs, COMMENT); + List properties = getColumnProperties(columnProperties); + Optional.ofNullable(indexGroupByName.get(columnName)).ifPresent(properties::addAll); + return new JdbcColumn.Builder() + .withName(columnName) + .withType(typeConverter.toGravitinoType(columnType)) + .withNullable(nullable) + .withComment(comment) + .withDefaultValue("NULL".equals(defaultValue) ? null : defaultValue) + .withProperties(properties) + .build(); + } + throw new NoSuchColumnException( + "Column " + colName + " does not exist in table " + createTable.getTable().getName()); + } + + /** + * @param databaseName database name + * @param tableName table name + * @return + */ + private CreateTable loadCreateTable(String databaseName, String tableName) { + try (Connection connection = getConnection(databaseName)) { + try (Statement statement = connection.createStatement()) { + String showCreateTableSQL = "SHOW CREATE TABLE " + tableName; + ResultSet resultSet = statement.executeQuery(showCreateTableSQL); + + if (!resultSet.next()) { + throw new NoSuchTableException("Table " + tableName + " does not exist."); + } + String createTableSql = resultSet.getString(2); + return (CreateTable) CCJSqlParserUtil.parse(createTableSql); + } catch (JSQLParserException e) { + throw new GravitinoRuntimeException( + String.format("Failed to parse create table %s.%s sql", databaseName, tableName), e); + } + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + /** + * @param indexes table index information object + * @return Get index information grouped by column name. + */ + private static Map> getIndexNameGroupByColumnName(List indexes) { + return indexes == null + ? Collections.emptyMap() + : indexes.stream() + .flatMap( + index -> + index.getColumnsNames().stream() + .map( + s -> + new AbstractMap.SimpleEntry>( + s.replaceAll("`", ""), + new HashSet() { + { + add(index.getType()); + } + }))) + .collect( + Collectors.toMap( + AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue, + (set, other) -> { + set.addAll(other); + return set; + })); + } + + private List getColumnProperties(String columnProperties) { + List properties = new ArrayList<>(); + if (StringUtils.containsIgnoreCase(columnProperties, AUTO_INCREMENT)) { + properties.add(AUTO_INCREMENT); + } + return properties; + } + + @Override + protected Map extractPropertiesFromResultSet(ResultSet table) { + // We have rewritten the `load` method, so there is no need to implement this method + throw new UnsupportedOperationException("Extracting table properties is not supported yet"); + } + + @Override + protected String generateCreateTableSql( + String tableName, + JdbcColumn[] columns, + String comment, + Map properties, + Transform[] partitioning) { + if (ArrayUtils.isNotEmpty(partitioning)) { + throw new UnsupportedOperationException("Currently we do not support Partitioning in Mysql"); + } + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("CREATE TABLE ").append(tableName).append(" (\n"); + + // Add columns + for (int i = 0; i < columns.length; i++) { + JdbcColumn column = columns[i]; + sqlBuilder.append(SPACE).append(SPACE).append(column.name()); + + appendColumnDefinition(column, sqlBuilder); + // Add comma for the next column, unless it's the last one + if (i < columns.length - 1) { + sqlBuilder.append(",\n"); + } + } + sqlBuilder.append("\n)"); + // Add table properties if any + if (MapUtils.isNotEmpty(properties)) { + + StringJoiner joiner = new StringJoiner(SPACE + SPACE); + for (Map.Entry entry : properties.entrySet()) { + joiner.add(entry.getKey() + "=" + entry.getValue()); + } + sqlBuilder.append(joiner); + } + + // Add table comment if specified + if (StringUtils.isNotEmpty(comment)) { + sqlBuilder.append(" COMMENT='").append(comment).append("'"); + } + + // Return the generated SQL statement + return sqlBuilder.toString(); + } + + @Override + protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) { + // We have rewritten the `load` method, so there is no need to implement this method + throw new UnsupportedOperationException("Extracting table columns is not supported yet"); + } + + @Override + protected String generateRenameTableSql(String oldTableName, String newTableName) { + return "RENAME TABLE " + oldTableName + " TO " + newTableName; + } + + @Override + protected String generateDropTableSql(String tableName) { + return "DROP TABLE " + tableName; + } + + @Override + protected String generatePurgeTableSql(String tableName) { + throw new UnsupportedOperationException("Purge table is not supported for MySQL"); + } + + @Override + protected String generateAlterTableSql( + String databaseName, String tableName, TableChange... changes) { + // Not all operations require the original table information, so lazy loading is used here + CreateTable lazyLoadCreateTable = null; + TableChange.UpdateComment updateComment = null; + List setProperties = new ArrayList<>(); + List alterSql = new ArrayList<>(); + for (int i = 0; i < changes.length; i++) { + TableChange change = changes[i]; + if (change instanceof TableChange.UpdateComment) { + updateComment = (TableChange.UpdateComment) change; + } else if (change instanceof TableChange.SetProperty) { + // The set attribute needs to be added at the end. + setProperties.add(((TableChange.SetProperty) change)); + } else if (change instanceof TableChange.RemoveProperty) { + // mysql does not support deleting table attributes, it can be replaced by Set Property + throw new UnsupportedOperationException("Remove property is not supported yet"); + } else if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + alterSql.add(addColumnFieldDefinition(addColumn)); + } else if (change instanceof TableChange.RenameColumn) { + lazyLoadCreateTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable); + TableChange.RenameColumn renameColumn = (TableChange.RenameColumn) change; + alterSql.add(renameColumnFieldDefinition(renameColumn, lazyLoadCreateTable)); + } else if (change instanceof TableChange.UpdateColumnType) { + lazyLoadCreateTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable); + TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change; + alterSql.add(updateColumnTypeFieldDefinition(updateColumnType, lazyLoadCreateTable)); + } else if (change instanceof TableChange.UpdateColumnComment) { + lazyLoadCreateTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable); + TableChange.UpdateColumnComment updateColumnComment = + (TableChange.UpdateColumnComment) change; + alterSql.add(updateColumnCommentFieldDefinition(updateColumnComment, lazyLoadCreateTable)); + } else if (change instanceof TableChange.UpdateColumnPosition) { + lazyLoadCreateTable = getOrCreateTable(databaseName, tableName, lazyLoadCreateTable); + TableChange.UpdateColumnPosition updateColumnPosition = + (TableChange.UpdateColumnPosition) change; + alterSql.add( + updateColumnPositionFieldDefinition(updateColumnPosition, lazyLoadCreateTable)); + } else if (change instanceof TableChange.DeleteColumn) { + TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change; + alterSql.add(deleteColumnFieldDefinition(deleteColumn)); + } + } + if (!setProperties.isEmpty()) { + alterSql.add(generateTableProperties(setProperties)); + } + + // Last modified comment + if (null != updateComment) { + alterSql.add("COMMENT '" + updateComment.getNewComment() + "'"); + } + + // Return the generated SQL statement + return "ALTER TABLE " + tableName + "\n" + String.join(",\n", alterSql) + ";"; + } + + private String generateTableProperties(List setProperties) { + return setProperties.stream() + .map( + setProperty -> + String.format("%s = %s", setProperty.getProperty(), setProperty.getValue())) + .collect(Collectors.joining(",\n")); + } + + private CreateTable getOrCreateTable( + String databaseName, String tableName, CreateTable lazyLoadCreateTable) { + return null != lazyLoadCreateTable + ? lazyLoadCreateTable + : loadCreateTable(databaseName, tableName); + } + + private String updateColumnCommentFieldDefinition( + TableChange.UpdateColumnComment updateColumnComment, CreateTable createTable) { + String newComment = updateColumnComment.getNewComment(); + return Arrays.stream(updateColumnComment.fieldNames()) + .filter(Objects::nonNull) + .map( + s -> { + JdbcColumn column = getJdbcColumnFromCreateTable(createTable, s); + column.getProperties().remove(PRIMARY_KEY); + JdbcColumn updateColumn = + new JdbcColumn.Builder() + .withName(s) + .withDefaultValue(column.getDefaultValue()) + .withNullable(column.nullable()) + .withProperties(column.getProperties()) + .withType(column.dataType()) + .withComment(newComment) + .build(); + return "MODIFY COLUMN " + + s + + appendColumnDefinition(updateColumn, new StringBuilder()); + }) + .collect(Collectors.joining(",\n")); + } + + private String addColumnFieldDefinition(TableChange.AddColumn addColumn) { + String dataType = typeConverter.fromGravitinoType(addColumn.getDataType()); + return Arrays.stream(addColumn.fieldNames()) + .filter(Objects::nonNull) + .map( + s -> { + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition.append("ADD COLUMN ").append(s).append(SPACE).append(dataType); + // Append comment if available + if (addColumn.getComment() != null && !addColumn.getComment().isEmpty()) { + columnDefinition.append("COMMENT '").append(addColumn.getComment()).append("' "); + } + + // Append position if available + if (addColumn.getPosition() != null) { + if (addColumn.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (addColumn.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = (TableChange.After) addColumn.getPosition(); + columnDefinition.append("AFTER ").append(afterPosition.getColumn()); + } + } + return columnDefinition.toString(); + }) + .collect(Collectors.joining(", \n")); + } + + private String renameColumnFieldDefinition( + TableChange.RenameColumn renameColumn, CreateTable createTable) { + if (renameColumn.fieldNames().length > 1) { + throw new IllegalArgumentException("Rename column for multiple columns is not supported yet"); + } + JdbcColumn column = getJdbcColumnFromCreateTable(createTable, renameColumn.fieldNames()[0]); + StringBuilder sqlBuilder = + new StringBuilder( + "CHANGE COLUMN " + renameColumn.fieldNames()[0] + SPACE + renameColumn.getNewName()); + + // Changing the column name does not support setting the primary key. You need to delete the + // primary key first and then add the primary key after modifying the column name. + column.getProperties().remove(PRIMARY_KEY); + return appendColumnDefinition(column, sqlBuilder).toString(); + } + + private String updateColumnPositionFieldDefinition( + TableChange.UpdateColumnPosition updateColumnPosition, CreateTable createTable) { + return Arrays.stream(updateColumnPosition.fieldNames()) + .filter(Objects::nonNull) + .map( + s -> { + JdbcColumn column = getJdbcColumnFromCreateTable(createTable, s); + StringBuilder columnDefinition = new StringBuilder(); + columnDefinition.append("MODIFY COLUMN ").append(s); + column.getProperties().remove(PRIMARY_KEY); + appendColumnDefinition(column, columnDefinition); + if (updateColumnPosition.getPosition() instanceof TableChange.First) { + columnDefinition.append("FIRST"); + } else if (updateColumnPosition.getPosition() instanceof TableChange.After) { + TableChange.After afterPosition = + (TableChange.After) updateColumnPosition.getPosition(); + columnDefinition.append("AFTER ").append(afterPosition.getColumn()); + } + return columnDefinition.toString(); + }) + .collect(Collectors.joining(", \n")); + } + + private String deleteColumnFieldDefinition(TableChange.DeleteColumn deleteColumn) { + return Arrays.stream(deleteColumn.fieldNames()) + .filter(Objects::nonNull) + .map(s -> "DROP COLUMN " + s) + .collect(Collectors.joining(", \n")); + } + + private String updateColumnTypeFieldDefinition( + TableChange.UpdateColumnType updateColumnType, CreateTable lazyLoadCreateTable) { + return Arrays.stream(updateColumnType.fieldNames()) + .filter(Objects::nonNull) + .map( + s -> { + JdbcColumn column = getJdbcColumnFromCreateTable(lazyLoadCreateTable, s); + StringBuilder sqlBuilder = new StringBuilder("MODIFY COLUMN " + s); + JdbcColumn newColumn = + new JdbcColumn.Builder() + .withName(s) + .withType(updateColumnType.getNewDataType()) + .withComment(column.comment()) + // Modifying a field type does not require adding its attributes. If + // additional attributes are required, they must be modified separately. + .withProperties(null) + .withDefaultValue(null) + .withNullable(column.nullable()) + .build(); + return appendColumnDefinition(newColumn, sqlBuilder).toString(); + }) + .collect(Collectors.joining(", \n")); + } + + private StringBuilder appendColumnDefinition(JdbcColumn column, StringBuilder sqlBuilder) { + // Add data type + sqlBuilder + .append(SPACE) + .append(typeConverter.fromGravitinoType(column.dataType())) + .append(SPACE); + + // Add NOT NULL if the column is marked as such + if (!column.nullable()) { + sqlBuilder.append("NOT NULL "); + } + // Add DEFAULT value if specified + if (StringUtils.isNotEmpty(column.getDefaultValue())) { + sqlBuilder.append("DEFAULT '").append(column.getDefaultValue()).append("'").append(SPACE); + } else { + if (column.nullable()) { + sqlBuilder.append("DEFAULT NULL "); + } + } + // Add AUTO_INCREMENT if the column is marked as such + if (CollectionUtils.isNotEmpty(column.getProperties())) { + for (String property : column.getProperties()) { + sqlBuilder.append(property).append(SPACE); + } + } + // Add column comment if specified + if (StringUtils.isNotEmpty(column.comment())) { + sqlBuilder.append("COMMENT '").append(column.comment()).append("' "); + } + return sqlBuilder; + } + + private static Map parseOrderedKeyValuePairs(String[] input) { + Map keyValuePairs = new HashMap<>(); + parseOrderedKeyValuePairs(input, keyValuePairs); + return keyValuePairs; + } + + private static void parseOrderedKeyValuePairs(String[] input, Map keyValuePairs) { + if (0 >= input.length - 1) { + return; + } + int firstIndexOfEquals = findFirstIndexOfEquals(input, "="); + if (-1 != firstIndexOfEquals) { + // Found an equal sign, so this is a key-value pair + String key = joinWords(input, 0, firstIndexOfEquals - 1); + String value = input[firstIndexOfEquals + 1]; + + // Check if the value is enclosed in single quotes + if (value.startsWith("'") && value.endsWith("'")) { + // Remove single quotes + value = value.substring(1, value.length() - 1); + } + + keyValuePairs.put(key, value); + + // Recursively process the remaining elements + if (firstIndexOfEquals + 2 < input.length - 1) { + parseOrderedKeyValuePairs( + ArrayUtils.subarray(input, firstIndexOfEquals + 2, input.length), keyValuePairs); + } + } + } + + private static int findFirstIndexOfEquals(String[] array, String search) { + for (int i = 0; i < array.length; i++) { + if (search.equals(array[i])) { + return i; + } + } + return -1; + } + + private static String joinWords(String[] words, int start, int end) { + StringBuilder result = new StringBuilder(); + for (int i = start; i <= end; i++) { + if (i > start) { + result.append(SPACE); + } + result.append(words[i]); + } + return result.toString(); + } + + private String findPropertiesValue(String[] columnSpecs, String propertyKey) { + for (int i = 0; i < columnSpecs.length; i++) { + String columnSpec = columnSpecs[i]; + if (propertyKey.equalsIgnoreCase(columnSpec) && i < columnSpecs.length - 1) { + return columnSpecs[i + 1].replaceAll("'", SPACE).trim(); + } + } + return null; + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ede0239631b..e8f073a611a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -43,7 +43,7 @@ publish-plugin = '1.2.0' rat-plugin = '0.8.0' shadow-plugin = "8.1.1" metrics = "4.2.19" - +jsqlparser = "4.2" [libraries] protobuf-java = { group = "com.google.protobuf", name = "protobuf-java", version.ref = "protoc" } @@ -113,6 +113,7 @@ jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref = "jwt"} jwt-gson = { group = "io.jsonwebtoken", name = "jjwt-gson", version.ref = "jwt"} metrics-core = { group = "io.dropwizard.metrics", name = "metrics-core", version.ref = "metrics" } metrics-jmx = { group = "io.dropwizard.metrics", name = "metrics-jmx", version.ref = "metrics" } +jsqlparser = { group = "com.github.jsqlparser", name = "jsqlparser", version.ref = "jsqlparser" } [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api"] diff --git a/integration-test/build.gradle.kts b/integration-test/build.gradle.kts index 26bd694f966..67c866f9ce0 100644 --- a/integration-test/build.gradle.kts +++ b/integration-test/build.gradle.kts @@ -23,6 +23,8 @@ dependencies { implementation(project(":core")) implementation(project(":catalogs:catalog-hive")) implementation(project(":catalogs:catalog-lakehouse-iceberg")) + implementation(project(":catalogs:catalog-jdbc-common")) + implementation(project(":catalogs:catalog-jdbc-mysql")) implementation(libs.guava) implementation(libs.bundles.log4j) implementation(libs.bundles.jersey) @@ -116,6 +118,9 @@ dependencies { testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.trino.jdbc) + testImplementation("org.testcontainers:testcontainers:1.19.0") + testImplementation("org.testcontainers:mysql:1.19.0") + testImplementation("mysql:mysql-connector-java:8.0.23") } /* Optimizing integration test execution conditions */ diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlAbstractIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlAbstractIT.java new file mode 100644 index 00000000000..367a1cd0516 --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlAbstractIT.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.catalog.mysql.operation; + +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils; +import com.datastrato.gravitino.catalog.mysql.converter.MysqlExceptionConverter; +import com.datastrato.gravitino.catalog.mysql.converter.MysqlTypeConverter; +import com.datastrato.gravitino.catalog.mysql.operation.MysqlDatabaseOperations; +import com.datastrato.gravitino.catalog.mysql.operation.MysqlTableOperations; +import java.util.HashMap; +import javax.sql.DataSource; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.shaded.com.google.common.collect.Maps; + +public class TestMysqlAbstractIT { + + private static MySQLContainer MYSQL_CONTAINER; + + protected static MysqlDatabaseOperations MYSQL_DATABASE_OPERATIONS; + + protected static MysqlTableOperations MYSQL_TABLE_OPERATIONS; + + private static DataSource DATA_SOURCE; + + protected static final String TEST_DB_NAME = RandomUtils.nextInt(0, 10000) + "_test_db"; + + @BeforeAll + public static void startup() { + MYSQL_CONTAINER = + new MySQLContainer<>("mysql:latest") + .withDatabaseName(TEST_DB_NAME) + .withUsername("root") + .withPassword("testpassword"); + + MYSQL_CONTAINER.start(); + MYSQL_DATABASE_OPERATIONS = new MysqlDatabaseOperations(); + MYSQL_TABLE_OPERATIONS = new MysqlTableOperations(); + HashMap properties = Maps.newHashMap(); + properties.put(JdbcConfig.JDBC_URL.getKey(), MYSQL_CONTAINER.getJdbcUrl()); + properties.put(JdbcConfig.USERNAME.getKey(), MYSQL_CONTAINER.getUsername()); + properties.put(JdbcConfig.PASSWORD.getKey(), MYSQL_CONTAINER.getPassword()); + DATA_SOURCE = DataSourceUtils.createDataSource(properties); + MYSQL_DATABASE_OPERATIONS.initialize(DATA_SOURCE, new MysqlExceptionConverter()); + MYSQL_TABLE_OPERATIONS.initialize( + DATA_SOURCE, new MysqlExceptionConverter(), new MysqlTypeConverter()); + } + + @AfterAll + public static void stop() { + DataSourceUtils.closeDataSource(DATA_SOURCE); + if (null != MYSQL_CONTAINER) { + MYSQL_CONTAINER.stop(); + } + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlDatabaseOperations.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlDatabaseOperations.java new file mode 100644 index 00000000000..ea0596bafbe --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlDatabaseOperations.java @@ -0,0 +1,54 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.catalog.mysql.operation; + +import com.datastrato.gravitino.catalog.jdbc.JdbcSchema; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestMysqlDatabaseOperations extends TestMysqlAbstractIT { + + @Test + public void testBaseOperationDatabase() { + String databaseName = RandomUtils.nextInt(0, 10000) + "_ct_db"; + Map properties = new HashMap<>(); + properties.put("CHARACTER SET", "utf8mb3"); + properties.put("COLLATE", "utf8mb3_general_ci"); + // Mysql database creation does not support incoming comments. + String comment = null; + // create database. + MYSQL_DATABASE_OPERATIONS.create(databaseName, comment, properties); + + // load database. + JdbcSchema load = MYSQL_DATABASE_OPERATIONS.load(databaseName); + + Assertions.assertEquals(databaseName, load.name()); + Assertions.assertEquals(comment, load.comment()); + + Assertions.assertTrue(properties.values().containsAll(load.properties().values())); + + // delete database. + UnsupportedOperationException unsupportedOperationException = + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> MYSQL_DATABASE_OPERATIONS.delete(databaseName, true)); + Assertions.assertTrue( + unsupportedOperationException + .getMessage() + .contains("MySQL does not support CASCADE option for DROP DATABASE.")); + + Assertions.assertDoesNotThrow(() -> MYSQL_DATABASE_OPERATIONS.delete(databaseName, false)); + + Assertions.assertThrows( + NoSuchSchemaException.class, () -> MYSQL_DATABASE_OPERATIONS.load(databaseName)); + List databases = MYSQL_DATABASE_OPERATIONS.list(); + Assertions.assertFalse(databases.contains(databaseName)); + } +} diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlTableOperations.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlTableOperations.java new file mode 100644 index 00000000000..88e1963bf0e --- /dev/null +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/mysql/operation/TestMysqlTableOperations.java @@ -0,0 +1,358 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.integration.test.catalog.mysql.operation; + +import static com.datastrato.gravitino.catalog.mysql.operation.MysqlTableOperations.AUTO_INCREMENT; +import static com.datastrato.gravitino.catalog.mysql.operation.MysqlTableOperations.PRIMARY_KEY; + +import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; +import com.datastrato.gravitino.catalog.jdbc.JdbcTable; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.datastrato.gravitino.exceptions.NoSuchTableException; +import com.datastrato.gravitino.rel.TableChange; +import io.substrait.type.TypeCreator; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestMysqlTableOperations extends TestMysqlAbstractIT { + + @Test + public void testOperationTable() { + String tableName = RandomUtils.nextLong(1, 10000) + "_op_table"; + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + columns.add( + new JdbcColumn.Builder() + .withName("col_1") + .withType(TypeCreator.NULLABLE.STRING) + .withComment("test_comment") + .withNullable(true) + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_2") + .withType(TypeCreator.NULLABLE.I32) + .withNullable(false) + .withComment("set primary key") + .withProperties( + new ArrayList() { + { + add(AUTO_INCREMENT); + add(PRIMARY_KEY); + } + }) + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_3") + .withType(TypeCreator.NULLABLE.I32) + .withProperties( + new ArrayList() { + { + add("UNIQUE KEY"); + } + }) + .withNullable(true) + .build()); + columns.add( + new JdbcColumn.Builder() + .withName("col_4") + .withType(TypeCreator.NULLABLE.STRING) + .withDefaultValue("hello world") + .withNullable(false) + .build()); + Map properties = new HashMap<>(); + properties.put("ENGINE", "InnoDB"); + properties.put(AUTO_INCREMENT, "10"); + // create table + MYSQL_TABLE_OPERATIONS.create( + TEST_DB_NAME, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + properties, + null); + + // list table + List tables = MYSQL_TABLE_OPERATIONS.list(TEST_DB_NAME); + Assertions.assertTrue(tables.contains(tableName)); + + // load table + JdbcTable load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, load); + + // rename table + String newName = "new_table"; + Assertions.assertDoesNotThrow( + () -> MYSQL_TABLE_OPERATIONS.rename(TEST_DB_NAME, tableName, newName)); + Assertions.assertDoesNotThrow(() -> MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, newName)); + + // alter table + JdbcColumn newColumn = + new JdbcColumn.Builder() + .withName("col_5") + .withType(TypeCreator.NULLABLE.STRING) + .withComment("new_add") + .withNullable(true) + .build(); + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + newName, + TableChange.addColumn( + new String[] {newColumn.name()}, + newColumn.dataType(), + newColumn.comment(), + TableChange.ColumnPosition.after("col_1"))); + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, newName); + List alterColumns = + new ArrayList() { + { + add(columns.get(0)); + add(newColumn); + add(columns.get(1)); + add(columns.get(2)); + add(columns.get(3)); + } + }; + assertionsTableInfo(newName, tableComment, alterColumns, properties, load); + + // delete column + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, newName, TableChange.deleteColumn(new String[] {newColumn.name()}, true)); + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, newName); + assertionsTableInfo(newName, tableComment, columns, properties, load); + + GravitinoRuntimeException gravitinoRuntimeException = + Assertions.assertThrows( + GravitinoRuntimeException.class, + () -> + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + newName, + TableChange.deleteColumn(new String[] {newColumn.name()}, true))); + + Assertions.assertTrue( + gravitinoRuntimeException + .getMessage() + .contains("Can't DROP 'col_5'; check that column/key exists")); + Assertions.assertDoesNotThrow(() -> MYSQL_TABLE_OPERATIONS.drop(TEST_DB_NAME, newName)); + Assertions.assertThrows( + NoSuchTableException.class, () -> MYSQL_TABLE_OPERATIONS.drop(TEST_DB_NAME, newName)); + } + + @Test + public void testAlterTable() { + String tableName = RandomUtils.nextLong(1, 10000) + "_al_table"; + String tableComment = "test_comment"; + List columns = new ArrayList<>(); + JdbcColumn col_1 = + new JdbcColumn.Builder() + .withName("col_1") + .withType(TypeCreator.NULLABLE.I32) + .withComment("id") + .withProperties( + new ArrayList() { + { + add(AUTO_INCREMENT); + add(PRIMARY_KEY); + } + }) + .withNullable(false) + .build(); + columns.add(col_1); + JdbcColumn col_2 = + new JdbcColumn.Builder() + .withName("col_2") + .withType(TypeCreator.NULLABLE.STRING) + .withComment("name") + .withProperties( + new ArrayList() { + { + add("UNIQUE KEY"); + } + }) + .withDefaultValue("hello world") + .withNullable(false) + .build(); + columns.add(col_2); + Map properties = new HashMap<>(); + properties.put("ENGINE", "InnoDB"); + properties.put(AUTO_INCREMENT, "10"); + // create table + MYSQL_TABLE_OPERATIONS.create( + TEST_DB_NAME, + tableName, + columns.toArray(new JdbcColumn[0]), + tableComment, + properties, + null); + JdbcTable load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + assertionsTableInfo(tableName, tableComment, columns, properties, load); + + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + tableName, + TableChange.updateColumnType(new String[] {col_1.name()}, TypeCreator.NULLABLE.STRING)); + + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + + // After modifying the type, some attributes of the corresponding column are not supported. + columns.clear(); + properties.remove(AUTO_INCREMENT); + col_1 = + new JdbcColumn.Builder() + .withName(col_1.name()) + .withType(TypeCreator.NULLABLE.STRING) + .withComment(col_1.comment()) + .withProperties( + new ArrayList() { + { + add(PRIMARY_KEY); + } + }) + .withNullable(col_1.nullable()) + .withDefaultValue(col_1.getDefaultValue()) + .build(); + columns.add(col_1); + columns.add(col_2); + assertionsTableInfo(tableName, tableComment, columns, properties, load); + + String newComment = "new_comment"; + // update table comment and column comment + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + tableName, + TableChange.updateColumnType(new String[] {col_1.name()}, TypeCreator.NULLABLE.I32), + TableChange.updateColumnComment(new String[] {col_2.name()}, newComment)); + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + + columns.clear(); + col_1 = + new JdbcColumn.Builder() + .withName(col_1.name()) + .withType(TypeCreator.NULLABLE.I32) + .withComment(col_1.comment()) + .withProperties(col_1.getProperties()) + .withNullable(col_1.nullable()) + .withDefaultValue(col_1.getDefaultValue()) + .build(); + col_2 = + new JdbcColumn.Builder() + .withName(col_2.name()) + .withType(col_2.dataType()) + .withComment(newComment) + .withProperties(col_2.getProperties()) + .withNullable(col_2.nullable()) + .withDefaultValue(col_2.getDefaultValue()) + .build(); + columns.add(col_1); + columns.add(col_2); + assertionsTableInfo(tableName, tableComment, columns, properties, load); + + String newColName_1 = "new_col_1"; + String newColName_2 = "new_col_2"; + // rename column + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + tableName, + TableChange.renameColumn(new String[] {col_1.name()}, newColName_1), + TableChange.renameColumn(new String[] {col_2.name()}, newColName_2)); + + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + + columns.clear(); + col_1 = + new JdbcColumn.Builder() + .withName(newColName_1) + .withType(col_1.dataType()) + .withComment(col_1.comment()) + .withProperties(col_1.getProperties()) + .withNullable(col_1.nullable()) + .withDefaultValue(col_1.getDefaultValue()) + .build(); + col_2 = + new JdbcColumn.Builder() + .withName(newColName_2) + .withType(col_2.dataType()) + .withComment(col_2.comment()) + .withProperties(col_2.getProperties()) + .withNullable(col_2.nullable()) + .withDefaultValue(col_2.getDefaultValue()) + .build(); + columns.add(col_1); + columns.add(col_2); + assertionsTableInfo(tableName, tableComment, columns, properties, load); + + newComment = "txt3"; + String newCol2Comment = "xxx"; + // update column position 、comment and add column、set table properties + MYSQL_TABLE_OPERATIONS.alterTable( + TEST_DB_NAME, + tableName, + TableChange.updateColumnPosition( + new String[] {newColName_1}, TableChange.ColumnPosition.after(newColName_2)), + TableChange.updateComment(newComment), + TableChange.addColumn( + new String[] {"col_3"}, + TypeCreator.NULLABLE.STRING, + "txt3", + TableChange.ColumnPosition.first()), + TableChange.updateColumnComment(new String[] {newColName_2}, newCol2Comment), + TableChange.setProperty("ROW_FORMAT", "DYNAMIC")); + load = MYSQL_TABLE_OPERATIONS.load(TEST_DB_NAME, tableName); + + columns.clear(); + + columns.add( + new JdbcColumn.Builder() + .withName("col_3") + .withType(TypeCreator.NULLABLE.STRING) + .withComment("txt3") + .build()); + columns.add( + new JdbcColumn.Builder() + .withName(col_2.name()) + .withType(col_2.dataType()) + .withComment(newCol2Comment) + .withProperties(col_2.getProperties()) + .withDefaultValue(col_2.getDefaultValue()) + .withNullable(col_2.nullable()) + .build()); + columns.add(col_1); + properties.put("ROW_FORMAT", "DYNAMIC"); + assertionsTableInfo(tableName, newComment, columns, properties, load); + } + + private static void assertionsTableInfo( + String tableName, + String tableComment, + List columns, + Map properties, + JdbcTable load) { + Assertions.assertEquals(tableName, load.name()); + Assertions.assertEquals(tableComment, load.comment()); + Assertions.assertEquals(columns.size(), load.columns().length); + for (int i = 0; i < columns.size(); i++) { + Assertions.assertEquals(columns.get(i).name(), load.columns()[i].name()); + Assertions.assertEquals(columns.get(i).dataType(), load.columns()[i].dataType()); + Assertions.assertEquals(columns.get(i).nullable(), load.columns()[i].nullable()); + Assertions.assertEquals(columns.get(i).comment(), load.columns()[i].comment()); + Assertions.assertEquals( + columns.get(i).getDefaultValue(), ((JdbcColumn) load.columns()[i]).getDefaultValue()); + if (null != columns.get(i).getProperties()) { + Assertions.assertEquals( + columns.get(i).getProperties(), ((JdbcColumn) load.columns()[i]).getProperties()); + } + } + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertEquals(entry.getValue(), load.properties().get(entry.getKey())); + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index ff551511664..54d031d607b 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -5,7 +5,7 @@ rootProject.name = "gravitino" include("api", "common", "core", "meta", "server", "integration-test", "server-common") -include("catalogs:catalog-hive", "catalogs:catalog-lakehouse-iceberg", "catalogs:catalog-jdbc-common") +include("catalogs:catalog-hive", "catalogs:catalog-lakehouse-iceberg", "catalogs:catalog-jdbc-common", "catalogs:catalog-jdbc-mysql") include("clients:client-java", "clients:client-java-runtime") include("trino-connector") include("web")