Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#777] feat(jdbc-mysql): Support for mysql catalog in Gravitino. #786

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;

/** Exception thrown when a column with specified name is not existed. */
public class NoSuchColumnException extends NotFoundException {

public NoSuchColumnException(String message) {
super(message);
}

public NoSuchColumnException(String message, Throwable cause) {
super(message, cause);
}
}
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,8 @@ tasks {

val copyCatalogLibAndConfigs by registering(Copy::class) {
dependsOn(":catalogs:catalog-hive:copyLibAndConfig",
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig")
":catalogs:catalog-lakehouse-iceberg:copyLibAndConfig",
":catalogs:catalog-jdbc-mysql:copyLibAndConfig")
}

clean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void close() {
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
List<String> schemaNames = databaseOperation.list();
List<String> schemaNames = databaseOperation.listDatabases();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
Expand Down Expand Up @@ -239,7 +239,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
String databaseName = NameIdentifier.of(namespace.levels()).name();
return tableOperation.list(databaseName).stream()
return tableOperation.listTables(databaseName).stream()
.map(table -> NameIdentifier.of(namespace, table))
.toArray(NameIdentifier[]::new);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
package com.datastrato.gravitino.catalog.jdbc;

import com.datastrato.gravitino.catalog.rel.BaseColumn;
import java.util.List;

/** Represents a column in the Jdbc column. */
public class JdbcColumn extends BaseColumn {
private String defaultValue;

private List<String> properties;

private JdbcColumn() {}

public String getDefaultValue() {
return defaultValue;
}

public List<String> getProperties() {
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
return properties;
}

/** A builder class for constructing JdbcColumn instances. */
public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {

Expand All @@ -24,11 +31,19 @@ public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {
*/
private String defaultValue;

/** Attribute value of the field, such as AUTO_INCREMENT, PRIMARY KEY, etc. */
private List<String> properties;

public Builder withDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
return this;
}

public Builder withProperties(List<String> properties) {
this.properties = properties;
return this;
}

/**
* Internal method to build a JdbcColumn instance using the provided values.
*
Expand All @@ -42,6 +57,7 @@ protected JdbcColumn internalBuild() {
jdbcColumn.dataType = dataType;
jdbcColumn.nullable = nullable;
jdbcColumn.defaultValue = defaultValue;
jdbcColumn.properties = properties;
return jdbcColumn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@

import com.datastrato.gravitino.rel.types.Type;

public abstract class JdbcTypeConverter {
public abstract class JdbcTypeConverter<FROM, TO> {
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

/**
* Convert from Gravitino type to JDBC type
*
* @param type
* @return
*/
public abstract Type toGravitinoType(String type);
public abstract Type toGravitinoType(FROM type);

/**
* Convert from JDBC type to Gravitino type
*
* @param type
* @return
*/
public abstract String fromGravitinoType(Type type);
public abstract TO fromGravitinoType(Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void create(String databaseName, String comment, Map<String, String> properties)
void delete(String databaseName, boolean cascade) throws NoSuchSchemaException;

/** @return The list name of databases. */
List<String> list();
List<String> listDatabases();

/**
* @param databaseName The name of the database to check.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,18 +58,35 @@ public void delete(String databaseName, boolean cascade) throws NoSuchSchemaExce
}
}

@Override
public List<String> listDatabases() {
List<String> databaseNames = new ArrayList<>();
try (final Connection connection = this.dataSource.getConnection()) {
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getCatalogs();
while (resultSet.next()) {
String databaseName = resultSet.getString("TABLE_CAT");
databaseNames.add(databaseName);
}
return databaseNames;
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
}
}

/**
* @param databaseName The name of the database.
* @param comment The comment of the database.
* @param properties The properties of the database.
* @return the SQL statement to create a database with the given name and comment.
*/
abstract String generateCreateDatabaseSql(
protected abstract String generateCreateDatabaseSql(
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
String databaseName, String comment, Map<String, String> properties);

/**
* @param databaseName The name of the database.
* @param cascade cascade If set to true, drops all the tables in the schema as well.
* @return the SQL statement to drop a database with the given name.
*/
abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
protected abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.jdbc.operation;

import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
Expand Down Expand Up @@ -32,8 +31,6 @@ public abstract class JdbcTableOperations implements TableOperation {

protected static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class);

protected static final String[] TABLE_TYPES = {"TABLE"};

protected DataSource dataSource;
protected JdbcExceptionConverter exceptionMapper;
protected JdbcTypeConverter typeConverter;
Expand Down Expand Up @@ -80,7 +77,7 @@ public void drop(String databaseName, String tableName) throws NoSuchTableExcept
}

@Override
public List<String> list(String databaseName) throws NoSuchSchemaException {
public List<String> listTables(String databaseName) throws NoSuchSchemaException {
try (Connection connection = getConnection(databaseName)) {
final List<String> names = Lists.newArrayList();
try (ResultSet tables = getTables(connection)) {
Expand Down Expand Up @@ -111,14 +108,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
try (ResultSet column = getColumns(connection, tableName)) {
List<JdbcColumn> result = new ArrayList<>();
while (column.next()) {
result.add(
new JdbcColumn.Builder()
.withName(column.getString("COLUMN_NAME"))
.withComment(column.getString("REMARKS"))
.withType(typeConverter.toGravitinoType(column.getString("TYPE_NAME")))
.withNullable(column.getInt("NULLABLE") == DatabaseMetaData.columnNullable)
.withDefaultValue(column.getString("COLUMN_DEF"))
.build());
result.add(extractJdbcColumnFromResultSet(column));
}
jdbcColumns = result.toArray(new JdbcColumn[0]);
}
Expand Down Expand Up @@ -157,7 +147,8 @@ public void alterTable(String databaseName, String tableName, TableChange... cha
throws NoSuchTableException {
LOG.info("Attempting to alter table {} from database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(connection, generateAlterTableSql(tableName, changes));
JdbcConnectorUtils.executeUpdate(
connection, generateAlterTableSql(databaseName, tableName, changes));
LOG.info("Alter table {} from database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand All @@ -178,13 +169,14 @@ public void purge(String databaseName, String tableName) throws NoSuchTableExcep
protected ResultSet getTables(Connection connection) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, null, TABLE_TYPES);
return metaData.getTables(databaseName, databaseName, null, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getTable(Connection connection, String tableName) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, tableName, TABLE_TYPES);
return metaData.getTables(
databaseName, databaseName, tableName, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getColumns(Connection connection, String tableName) throws SQLException {
Expand All @@ -199,6 +191,9 @@ protected ResultSet getColumns(Connection connection, String tableName) throws S
*/
protected abstract Map<String, String> extractPropertiesFromResultSet(ResultSet table);

protected abstract JdbcColumn extractJdbcColumnFromResultSet(ResultSet column)
throws SQLException;

protected abstract String generateCreateTableSql(
String tableName,
JdbcColumn[] columns,
Expand All @@ -212,7 +207,8 @@ protected abstract String generateCreateTableSql(

protected abstract String generatePurgeTableSql(String tableName);

protected abstract String generateAlterTableSql(String tableName, TableChange... changes);
protected abstract String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes);

protected Connection getConnection(String schema) throws SQLException {
Connection connection = dataSource.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void create(
* @param databaseName The name of the database.
* @return A list of table names in the database.
*/
List<String> list(String databaseName) throws NoSuchSchemaException;
List<String> listTables(String databaseName) throws NoSuchSchemaException;

/**
* @param databaseName The name of the database.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

public final class JdbcConnectorUtils {

public static final String[] TABLE_TYPES = {"TABLE"};
Clearvive marked this conversation as resolved.
Show resolved Hide resolved

private JdbcConnectorUtils() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

public class SqliteTypeConverter extends JdbcTypeConverter {
public class SqliteTypeConverter extends JdbcTypeConverter<String, String> {

protected static final Map<Type, String> GRAVITINO_TO_SQLITE_MAPPING = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public boolean exist(String databaseName) {
}

@Override
public List<String> list() {
public List<String> listDatabases() {
File file = new File(dbPath);
Preconditions.checkArgument(file.exists(), "Database path %s does not exist", dbPath);
return Arrays.stream(Objects.requireNonNull(file.listFiles()))
Expand All @@ -81,13 +81,13 @@ public void delete(String databaseName) {
}

@Override
String generateCreateDatabaseSql(
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
return null;
}

@Override
String generateDropDatabaseSql(String databaseName, boolean cascade) {
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -68,10 +69,22 @@ protected String generatePurgeTableSql(String tableName) {
}

@Override
protected String generateAlterTableSql(String tableName, TableChange... changes) {
protected String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes) {
throw new UnsupportedOperationException("Alter table is not supported in sqlite.");
}

@Override
protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) throws SQLException {
return new JdbcColumn.Builder()
.withName(resultSet.getString("COLUMN_NAME"))
.withComment(null)
Clearvive marked this conversation as resolved.
Show resolved Hide resolved
.withType(typeConverter.toGravitinoType(resultSet.getString("TYPE_NAME")))
.withNullable(resultSet.getBoolean("NULLABLE"))
.withDefaultValue(resultSet.getString("COLUMN_DEF"))
.build();
}

@Override
protected Map<String, String> extractPropertiesFromResultSet(ResultSet table) {
// Sqlite does not support table properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ public void testOperationDatabase() {
Assertions.assertDoesNotThrow(() -> JDBC_DATABASE_OPERATIONS.create(database2, null, null));

// list database
List<String> listDatabases = JDBC_DATABASE_OPERATIONS.list();
List<String> listDatabases = JDBC_DATABASE_OPERATIONS.listDatabases();
Assertions.assertEquals(2, listDatabases.size());
Assertions.assertTrue(listDatabases.contains(database1));
Assertions.assertTrue(listDatabases.contains(database2));

// drop database
JDBC_DATABASE_OPERATIONS.delete(database1);
List<String> databases = JDBC_DATABASE_OPERATIONS.list();
List<String> databases = JDBC_DATABASE_OPERATIONS.listDatabases();
Assertions.assertFalse(databases.contains(database1));
Assertions.assertNotNull(JDBC_DATABASE_OPERATIONS.load(database2));
JDBC_DATABASE_OPERATIONS.delete(database2);
Expand Down
Loading