Skip to content

Commit

Permalink
[#777] feat(jdbc-mysql): Support for mysql catalog in Gravitino.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Nov 30, 2023
1 parent 10b914e commit 50cc354
Show file tree
Hide file tree
Showing 29 changed files with 1,538 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;

/** Exception thrown when a column with specified name is not existed. */
public class NoSuchColumnException extends NotFoundException {

public NoSuchColumnException(String message) {
super(message);
}

public NoSuchColumnException(String message, Throwable cause) {
super(message, cause);
}
}
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ tasks {

val copyCatalogLibAndConfigs by registering(Copy::class) {
dependsOn(":catalogs:catalog-hive:copyLibAndConfig",
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig")
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig")
}

clean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void close() {
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
List<String> schemaNames = databaseOperation.list();
List<String> schemaNames = databaseOperation.listDatabases();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
Expand Down Expand Up @@ -239,7 +239,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
String databaseName = NameIdentifier.of(namespace.levels()).name();
return tableOperation.list(databaseName).stream()
return tableOperation.listTables(databaseName).stream()
.map(table -> NameIdentifier.of(namespace, table))
.toArray(NameIdentifier[]::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
package com.datastrato.gravitino.catalog.jdbc;

import com.datastrato.gravitino.catalog.rel.BaseColumn;
import java.util.List;

/** Represents a column in the Jdbc column. */
public class JdbcColumn extends BaseColumn {
private String defaultValue;

private List<String> properties;

private JdbcColumn() {}

public String getDefaultValue() {
return defaultValue;
}

public List<String> getProperties() {
return properties;
}

/** A builder class for constructing JdbcColumn instances. */
public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {

Expand All @@ -24,11 +31,19 @@ public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {
*/
private String defaultValue;

/** Attribute value of the field, such as AUTO_INCREMENT, PRIMARY KEY, etc. */
private List<String> properties;

public Builder withDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
return this;
}

public Builder withProperties(List<String> properties) {
this.properties = properties;
return this;
}

/**
* Internal method to build a JdbcColumn instance using the provided values.
*
Expand All @@ -42,6 +57,7 @@ protected JdbcColumn internalBuild() {
jdbcColumn.dataType = dataType;
jdbcColumn.nullable = nullable;
jdbcColumn.defaultValue = defaultValue;
jdbcColumn.properties = properties;
return jdbcColumn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@

import com.datastrato.gravitino.rel.types.Type;

public abstract class JdbcTypeConverter {
public abstract class JdbcTypeConverter<FROM, TO> {

/**
* Convert from Gravitino type to JDBC type
*
* @param type
* @return
*/
public abstract Type toGravitinoType(String type);
public abstract Type toGravitinoType(FROM type);

/**
* Convert from JDBC type to Gravitino type
*
* @param type
* @return
*/
public abstract String fromGravitinoType(Type type);
public abstract TO fromGravitinoType(Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void create(String databaseName, String comment, Map<String, String> properties)
void delete(String databaseName, boolean cascade) throws NoSuchSchemaException;

/** @return The list name of databases. */
List<String> list();
List<String> listDatabases();

/**
* @param databaseName The name of the database to check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,18 +58,35 @@ public void delete(String databaseName, boolean cascade) throws NoSuchSchemaExce
}
}

@Override
public List<String> listDatabases() {
List<String> databaseNames = new ArrayList<>();
try (final Connection connection = this.dataSource.getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getCatalogs();
while (resultSet.next()) {
String databaseName = resultSet.getString("TABLE_CAT");
databaseNames.add(databaseName);
}
return databaseNames;
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
}
}

/**
* @param databaseName The name of the database.
* @param comment The comment of the database.
* @param properties The properties of the database.
* @return the SQL statement to create a database with the given name and comment.
*/
abstract String generateCreateDatabaseSql(
protected abstract String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties);

/**
* @param databaseName The name of the database.
* @param cascade cascade If set to true, drops all the tables in the schema as well.
* @return the SQL statement to drop a database with the given name.
*/
abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
protected abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.jdbc.operation;

import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
Expand Down Expand Up @@ -32,8 +31,6 @@ public abstract class JdbcTableOperations implements TableOperation {

protected static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class);

protected static final String[] TABLE_TYPES = {"TABLE"};

protected DataSource dataSource;
protected JdbcExceptionConverter exceptionMapper;
protected JdbcTypeConverter typeConverter;
Expand Down Expand Up @@ -80,7 +77,7 @@ public void drop(String databaseName, String tableName) throws NoSuchTableExcept
}

@Override
public List<String> list(String databaseName) throws NoSuchSchemaException {
public List<String> listTables(String databaseName) throws NoSuchSchemaException {
try (Connection connection = getConnection(databaseName)) {
final List<String> names = Lists.newArrayList();
try (ResultSet tables = getTables(connection)) {
Expand Down Expand Up @@ -111,14 +108,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
try (ResultSet column = getColumns(connection, tableName)) {
List<JdbcColumn> result = new ArrayList<>();
while (column.next()) {
result.add(
new JdbcColumn.Builder()
.withName(column.getString("COLUMN_NAME"))
.withComment(column.getString("REMARKS"))
.withType(typeConverter.toGravitinoType(column.getString("TYPE_NAME")))
.withNullable(column.getInt("NULLABLE") == DatabaseMetaData.columnNullable)
.withDefaultValue(column.getString("COLUMN_DEF"))
.build());
result.add(extractJdbcColumnFromResultSet(column));
}
jdbcColumns = result.toArray(new JdbcColumn[0]);
}
Expand Down Expand Up @@ -157,7 +147,8 @@ public void alterTable(String databaseName, String tableName, TableChange... cha
throws NoSuchTableException {
LOG.info("Attempting to alter table {} from database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(connection, generateAlterTableSql(tableName, changes));
JdbcConnectorUtils.executeUpdate(
connection, generateAlterTableSql(databaseName, tableName, changes));
LOG.info("Alter table {} from database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand All @@ -178,13 +169,14 @@ public void purge(String databaseName, String tableName) throws NoSuchTableExcep
protected ResultSet getTables(Connection connection) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, null, TABLE_TYPES);
return metaData.getTables(databaseName, databaseName, null, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getTable(Connection connection, String tableName) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, tableName, TABLE_TYPES);
return metaData.getTables(
databaseName, databaseName, tableName, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getColumns(Connection connection, String tableName) throws SQLException {
Expand All @@ -199,6 +191,9 @@ protected ResultSet getColumns(Connection connection, String tableName) throws S
*/
protected abstract Map<String, String> extractPropertiesFromResultSet(ResultSet table);

protected abstract JdbcColumn extractJdbcColumnFromResultSet(ResultSet column)
throws SQLException;

protected abstract String generateCreateTableSql(
String tableName,
JdbcColumn[] columns,
Expand All @@ -212,7 +207,8 @@ protected abstract String generateCreateTableSql(

protected abstract String generatePurgeTableSql(String tableName);

protected abstract String generateAlterTableSql(String tableName, TableChange... changes);
protected abstract String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes);

protected Connection getConnection(String schema) throws SQLException {
Connection connection = dataSource.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void create(
* @param databaseName The name of the database.
* @return A list of table names in the database.
*/
List<String> list(String databaseName) throws NoSuchSchemaException;
List<String> listTables(String databaseName) throws NoSuchSchemaException;

/**
* @param databaseName The name of the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

public final class JdbcConnectorUtils {

public static final String[] TABLE_TYPES = {"TABLE"};

private JdbcConnectorUtils() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

public class SqliteTypeConverter extends JdbcTypeConverter {
public class SqliteTypeConverter extends JdbcTypeConverter<String, String> {

protected static final Map<Type, String> GRAVITINO_TO_SQLITE_MAPPING = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean exist(String databaseName) {
}

@Override
public List<String> list() {
public List<String> listDatabases() {
File file = new File(dbPath);
Preconditions.checkArgument(file.exists(), "Database path %s does not exist", dbPath);
return Arrays.stream(Objects.requireNonNull(file.listFiles()))
Expand All @@ -81,13 +81,13 @@ public void delete(String databaseName) {
}

@Override
String generateCreateDatabaseSql(
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
return null;
}

@Override
String generateDropDatabaseSql(String databaseName, boolean cascade) {
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -68,10 +69,22 @@ protected String generatePurgeTableSql(String tableName) {
}

@Override
protected String generateAlterTableSql(String tableName, TableChange... changes) {
protected String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes) {
throw new UnsupportedOperationException("Alter table is not supported in sqlite.");
}

@Override
protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) throws SQLException {
return new JdbcColumn.Builder()
.withName(resultSet.getString("COLUMN_NAME"))
.withComment(null)
.withType(typeConverter.toGravitinoType(resultSet.getString("TYPE_NAME")))
.withNullable(resultSet.getBoolean("NULLABLE"))
.withDefaultValue(resultSet.getString("COLUMN_DEF"))
.build();
}

@Override
protected Map<String, String> extractPropertiesFromResultSet(ResultSet table) {
// Sqlite does not support table properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ public void testOperationDatabase() {
Assertions.assertDoesNotThrow(() -> JDBC_DATABASE_OPERATIONS.create(database2, null, null));

// list database
List<String> listDatabases = JDBC_DATABASE_OPERATIONS.list();
List<String> listDatabases = JDBC_DATABASE_OPERATIONS.listDatabases();
Assertions.assertEquals(2, listDatabases.size());
Assertions.assertTrue(listDatabases.contains(database1));
Assertions.assertTrue(listDatabases.contains(database2));

// drop database
JDBC_DATABASE_OPERATIONS.delete(database1);
List<String> databases = JDBC_DATABASE_OPERATIONS.list();
List<String> databases = JDBC_DATABASE_OPERATIONS.listDatabases();
Assertions.assertFalse(databases.contains(database1));
Assertions.assertNotNull(JDBC_DATABASE_OPERATIONS.load(database2));
JDBC_DATABASE_OPERATIONS.delete(database2);
Expand Down
Loading

0 comments on commit 50cc354

Please sign in to comment.