Skip to content

Commit

Permalink
[#777] feat(jdbc-mysql): Support for mysql catalog in Gravitino.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Nov 24, 2023
1 parent ad8e98b commit 87404ba
Show file tree
Hide file tree
Showing 20 changed files with 1,405 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.exceptions;

/** Exception thrown when a column with specified name is not existed. */
public class NoSuchColumnException extends NotFoundException {

public NoSuchColumnException(String message) {
super(message);
}

public NoSuchColumnException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
package com.datastrato.gravitino.catalog.jdbc;

import com.datastrato.gravitino.catalog.rel.BaseColumn;
import java.util.List;

/** Represents a column in the Jdbc column. */
public class JdbcColumn extends BaseColumn {
private String defaultValue;

private List<String> properties;

private JdbcColumn() {}

public String getDefaultValue() {
return defaultValue;
}

public List<String> getProperties() {
return properties;
}

/** A builder class for constructing JdbcColumn instances. */
public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {

Expand All @@ -24,11 +31,19 @@ public static class Builder extends BaseColumnBuilder<Builder, JdbcColumn> {
*/
private String defaultValue;

/** Attribute value of the field, such as AUTO_INCREMENT, PRIMARY KEY, etc. */
private List<String> properties;

public Builder withDefaultValue(String defaultValue) {
this.defaultValue = defaultValue;
return this;
}

public Builder withProperties(List<String> properties) {
this.properties = properties;
return this;
}

/**
* Internal method to build a JdbcColumn instance using the provided values.
*
Expand All @@ -42,6 +57,7 @@ protected JdbcColumn internalBuild() {
jdbcColumn.dataType = dataType;
jdbcColumn.nullable = nullable;
jdbcColumn.defaultValue = defaultValue;
jdbcColumn.properties = properties;
return jdbcColumn;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
Expand Down Expand Up @@ -54,18 +58,34 @@ public void delete(String databaseName, boolean cascade) throws NoSuchSchemaExce
}
}

@Override
public List<String> list() {
List<String> tableNames = new ArrayList<>();
try (final Connection connection = this.dataSource.getConnection()) {
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getTables(null, null, null, JdbcConnectorUtils.TABLE_TYPES);
while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME");
tableNames.add(tableName);
}
return tableNames;
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
}
}

/**
* @param databaseName The name of the database.
* @param comment The comment of the database.
* @return the SQL statement to create a database with the given name and comment.
*/
abstract String generateCreateDatabaseSql(
public abstract String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties);

/**
* @param databaseName The name of the database.
* @param cascade cascade If set to true, drops all the tables in the schema as well.
* @return the SQL statement to drop a database with the given name.
*/
abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
public abstract String generateDropDatabaseSql(String databaseName, boolean cascade);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.jdbc.operation;

import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
Expand Down Expand Up @@ -32,8 +31,6 @@ public abstract class JdbcTableOperations implements TableOperation {

protected static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class);

protected static final String[] TABLE_TYPES = {"TABLE"};

protected DataSource dataSource;
protected JdbcExceptionConverter exceptionMapper;
protected JdbcTypeConverter typeConverter;
Expand Down Expand Up @@ -111,14 +108,7 @@ public JdbcTable load(String databaseName, String tableName) throws NoSuchTableE
try (ResultSet column = getColumns(connection, tableName)) {
List<JdbcColumn> result = new ArrayList<>();
while (column.next()) {
result.add(
new JdbcColumn.Builder()
.withName(column.getString("COLUMN_NAME"))
.withComment(column.getString("REMARKS"))
.withType(typeConverter.toGravitinoType(column.getString("TYPE_NAME")))
.withNullable(column.getInt("NULLABLE") == DatabaseMetaData.columnNullable)
.withDefaultValue(column.getString("COLUMN_DEF"))
.build());
result.add(extractJdbcColumnFromResultSet(column));
}
jdbcColumns = result.toArray(new JdbcColumn[0]);
}
Expand Down Expand Up @@ -157,7 +147,8 @@ public void alterTable(String databaseName, String tableName, TableChange... cha
throws NoSuchTableException {
LOG.info("Attempting to alter table {} from database {}", tableName, databaseName);
try (Connection connection = getConnection(databaseName)) {
JdbcConnectorUtils.executeUpdate(connection, generateAlterTableSql(tableName, changes));
JdbcConnectorUtils.executeUpdate(
connection, generateAlterTableSql(databaseName, tableName, changes));
LOG.info("Alter table {} from database {}", tableName, databaseName);
} catch (final SQLException se) {
throw this.exceptionMapper.toGravitinoException(se);
Expand All @@ -178,13 +169,14 @@ public void purge(String databaseName, String tableName) throws NoSuchTableExcep
protected ResultSet getTables(Connection connection) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, null, TABLE_TYPES);
return metaData.getTables(databaseName, databaseName, null, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getTable(Connection connection, String tableName) throws SQLException {
final DatabaseMetaData metaData = connection.getMetaData();
String databaseName = connection.getSchema();
return metaData.getTables(databaseName, databaseName, tableName, TABLE_TYPES);
return metaData.getTables(
databaseName, databaseName, tableName, JdbcConnectorUtils.TABLE_TYPES);
}

protected ResultSet getColumns(Connection connection, String tableName) throws SQLException {
Expand All @@ -199,6 +191,9 @@ protected ResultSet getColumns(Connection connection, String tableName) throws S
*/
protected abstract Map<String, String> extractPropertiesFromResultSet(ResultSet table);

protected abstract JdbcColumn extractJdbcColumnFromResultSet(ResultSet column)
throws SQLException;

protected abstract String generateCreateTableSql(
String tableName,
JdbcColumn[] columns,
Expand All @@ -212,7 +207,8 @@ protected abstract String generateCreateTableSql(

protected abstract String generatePurgeTableSql(String tableName);

protected abstract String generateAlterTableSql(String tableName, TableChange... changes);
protected abstract String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes);

protected Connection getConnection(String schema) throws SQLException {
Connection connection = dataSource.getConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

public final class JdbcConnectorUtils {

public static final String[] TABLE_TYPES = {"TABLE"};

private JdbcConnectorUtils() {}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ public void delete(String databaseName) {
}

@Override
String generateCreateDatabaseSql(
public String generateCreateDatabaseSql(
String databaseName, String comment, Map<String, String> properties) {
return null;
}

@Override
String generateDropDatabaseSql(String databaseName, boolean cascade) {
public String generateDropDatabaseSql(String databaseName, boolean cascade) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -68,10 +69,22 @@ protected String generatePurgeTableSql(String tableName) {
}

@Override
protected String generateAlterTableSql(String tableName, TableChange... changes) {
protected String generateAlterTableSql(
String databaseName, String tableName, TableChange... changes) {
throw new UnsupportedOperationException("Alter table is not supported in sqlite.");
}

@Override
protected JdbcColumn extractJdbcColumnFromResultSet(ResultSet resultSet) throws SQLException {
return new JdbcColumn.Builder()
.withName(resultSet.getString("COLUMN_NAME"))
.withComment(null)
.withType(typeConverter.toGravitinoType(resultSet.getString("TYPE_NAME")))
.withNullable(resultSet.getBoolean("NULLABLE"))
.withDefaultValue(resultSet.getString("COLUMN_DEF"))
.build();
}

@Override
protected Map<String, String> extractPropertiesFromResultSet(ResultSet table) {
// Sqlite does not support table properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void testOperationTable() {
JdbcTable loadTable = JDBC_TABLE_OPERATIONS.load(DATABASE_NAME, table1);
Assertions.assertNotNull(loadTable);
Assertions.assertEquals(table1, loadTable.name());
Assertions.assertEquals(null, loadTable.comment());
Assertions.assertNull(loadTable.comment());
Assertions.assertEquals(properties, loadTable.properties());
Assertions.assertEquals(jdbcColumns.length, loadTable.columns().length);
Map<String, JdbcColumn> createColumnMap =
Expand All @@ -140,7 +140,7 @@ public void testOperationTable() {
Assertions.assertEquals(jdbcColumn.name(), column.name());
Assertions.assertEquals(jdbcColumn.comment(), column.comment());
Assertions.assertEquals(jdbcColumn.dataType(), column.dataType());
Assertions.assertEquals(jdbcColumn.nullable(), ((JdbcColumn) column).nullable());
Assertions.assertEquals(jdbcColumn.nullable(), column.nullable());
Assertions.assertEquals(
jdbcColumn.getDefaultValue(), ((JdbcColumn) column).getDefaultValue());
}
Expand Down
31 changes: 31 additions & 0 deletions catalogs/catalog-jdbc-mysql/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
description = "catalog-jdbc-mysql"

plugins {
`maven-publish`
id("java")
id("idea")
id("com.diffplug.spotless")
}

dependencies {
implementation(project(":common"))
implementation(project(":core"))
implementation(project(":api"))
implementation(project(":catalogs:catalog-jdbc-common"))
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.jsqlparser)
implementation(libs.substrait.java.core) {
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
exclude("com.fasterxml.jackson.dataformat")
exclude("com.google.protobuf")
exclude("com.google.code.findbugs")
exclude("org.slf4j")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.mysql;

import com.datastrato.gravitino.catalog.jdbc.JdbcCatalog;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.catalog.mysql.converter.MysqlExceptionConverter;
import com.datastrato.gravitino.catalog.mysql.converter.MysqlTypeConverter;
import com.datastrato.gravitino.catalog.mysql.operation.MysqlDatabaseOperations;
import com.datastrato.gravitino.catalog.mysql.operation.MysqlTableOperations;

/** Implementation of an Jdbc catalog in Gravitino. */
public abstract class MysqlCatalog extends JdbcCatalog {

@Override
public String shortName() {
return "mysql";
}

@Override
protected JdbcExceptionConverter createExceptionConverter() {
return new MysqlExceptionConverter();
}

@Override
protected JdbcTypeConverter createJdbcTypeConverter() {
return new MysqlTypeConverter();
}

@Override
protected JdbcDatabaseOperations createJdbcDatabaseOperations() {
return new MysqlDatabaseOperations();
}

@Override
protected JdbcTableOperations createJdbcTableOperations() {
return new MysqlTableOperations();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.mysql.converter;

import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.exceptions.GravitinoRuntimeException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import java.sql.SQLException;

/** Exception converter to gravitino exception for MySQL. */
public class MysqlExceptionConverter extends JdbcExceptionConverter {

@Override
public GravitinoRuntimeException toGravitinoException(SQLException se) {
switch (se.getErrorCode()) {
case 1007:
return new SchemaAlreadyExistsException(se.getMessage(), se);
case 1050:
return new TableAlreadyExistsException(se.getMessage(), se);
case 1008:
return new NoSuchSchemaException(se.getMessage(), se);
case 1146:
case 1051:
return new NoSuchTableException(se.getMessage(), se);
default:
return new GravitinoRuntimeException(se.getMessage(), se);
}
}
}
Loading

0 comments on commit 87404ba

Please sign in to comment.