Skip to content

Commit

Permalink
[#676] feat(jdbc): Support for table operations in JDBC catalog. (#771)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

We need to add operator methods for JDBC table-related operations to
facilitate the implementation of connecting with tables.

### Why are the changes needed?

Fix: #676 

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

---------

Co-authored-by: Clearvive <clearvive@datastrato.com>
  • Loading branch information
Clearvive and Clearvive authored Nov 23, 2023
1 parent 01a104f commit ad8e98b
Show file tree
Hide file tree
Showing 12 changed files with 813 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand All @@ -29,7 +30,8 @@ protected CatalogOperations newOps(Map<String, String> config) {
entity(),
createExceptionConverter(),
createJdbcTypeConverter(),
createJdbcDatabaseOperations());
createJdbcDatabaseOperations(),
createJdbcTableOperations());
ops.initialize(config);
return ops;
}
Expand Down Expand Up @@ -58,4 +60,7 @@ protected JdbcExceptionConverter createExceptionConverter() {
* @return The {@link JdbcDatabaseOperations} to be used by the catalog to manage databases in the
*/
protected abstract JdbcDatabaseOperations createJdbcDatabaseOperations();

/** @return The {@link JdbcTableOperations} to be used by the catalog to manage tables in the */
protected abstract JdbcTableOperations createJdbcTableOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.DatabaseOperation;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcTableOperations;
import com.datastrato.gravitino.catalog.jdbc.operation.TableOperation;
import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
Expand All @@ -31,14 +34,17 @@
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -62,7 +68,9 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas

private final JdbcTypeConverter jdbcTypeConverter;

private final JdbcDatabaseOperations jdbcDatabaseOperations;
private final DatabaseOperation databaseOperation;

private final TableOperation tableOperation;

private DataSource dataSource;

Expand All @@ -72,17 +80,20 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas
* @param entity The catalog entity associated with this operations instance.
* @param exceptionConverter The exception converter to be used by the operations.
* @param jdbcTypeConverter The type converter to be used by the operations.
* @param jdbcDatabaseOperations The database operations to be used by the operations.
* @param databaseOperation The database operations to be used by the operations.
* @param tableOperation The table operations to be used by the operations.
*/
public JdbcCatalogOperations(
CatalogEntity entity,
JdbcExceptionConverter exceptionConverter,
JdbcTypeConverter jdbcTypeConverter,
JdbcDatabaseOperations jdbcDatabaseOperations) {
JdbcDatabaseOperations databaseOperation,
JdbcTableOperations tableOperation) {
this.entity = entity;
this.exceptionConverter = exceptionConverter;
this.jdbcTypeConverter = jdbcTypeConverter;
this.jdbcDatabaseOperations = jdbcDatabaseOperations;
this.databaseOperation = databaseOperation;
this.tableOperation = tableOperation;
}

/**
Expand All @@ -98,7 +109,8 @@ public void initialize(Map<String, String> conf) throws RuntimeException {
Maps.newHashMap(MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX));
JdbcConfig jdbcConfig = new JdbcConfig(resultConf);
this.dataSource = DataSourceUtils.createDataSource(jdbcConfig);
this.jdbcDatabaseOperations.initialize(dataSource, exceptionConverter);
this.databaseOperation.initialize(dataSource, exceptionConverter);
this.tableOperation.initialize(dataSource, exceptionConverter, jdbcTypeConverter);
this.jdbcCatalogPropertiesMetadata = new JdbcCatalogPropertiesMetadata();
this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata();
this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata();
Expand All @@ -119,7 +131,7 @@ public void close() {
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
List<String> schemaNames = jdbcDatabaseOperations.list();
List<String> schemaNames = databaseOperation.list();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
Expand Down Expand Up @@ -152,7 +164,7 @@ public JdbcSchema createSchema(
}
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
jdbcDatabaseOperations.create(
databaseOperation.create(
ident.name(), StringIdentifier.addToComment(identifier, comment), resultProperties);
return new JdbcSchema.Builder()
.withName(ident.name())
Expand All @@ -171,23 +183,22 @@ public JdbcSchema createSchema(
*/
@Override
public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
JdbcSchema load = jdbcDatabaseOperations.load(ident.name());
JdbcSchema load = databaseOperation.load(ident.name());
String comment = load.comment();
StringIdentifier id = StringIdentifier.fromComment(comment);
if (id == null) {
LOG.warn("The comment {} does not contain gravitino id attribute", comment);
return load;
} else {
Map<String, String> properties =
load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties());
StringIdentifier.addToProperties(id, properties);
return new JdbcSchema.Builder()
.withAuditInfo(load.auditInfo())
.withName(load.name())
.withComment(load.comment())
.withProperties(properties)
.build();
}
Map<String, String> properties =
load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties());
StringIdentifier.addToProperties(id, properties);
return new JdbcSchema.Builder()
.withAuditInfo(load.auditInfo())
.withName(load.name())
.withComment(load.comment())
.withProperties(properties)
.build();
}

/**
Expand All @@ -214,7 +225,7 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
*/
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
jdbcDatabaseOperations.delete(ident.name(), cascade);
databaseOperation.delete(ident.name(), cascade);
return true;
}

Expand All @@ -227,7 +238,10 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
*/
@Override
public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
String databaseName = NameIdentifier.of(namespace.levels()).name();
return tableOperation.list(databaseName).stream()
.map(table -> NameIdentifier.of(namespace, table))
.toArray(NameIdentifier[]::new);
}

/**
Expand All @@ -239,22 +253,59 @@ public NameIdentifier[] listTables(Namespace namespace) throws NoSuchSchemaExcep
*/
@Override
public Table loadTable(NameIdentifier tableIdent) throws NoSuchTableException {
throw new UnsupportedOperationException();
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
String tableName = tableIdent.name();
JdbcTable load = tableOperation.load(databaseName, tableName);
String comment = load.comment();
StringIdentifier id = StringIdentifier.fromComment(comment);
if (id == null) {
LOG.warn(
"The table {} comment {} does not contain gravitino id attribute", tableName, comment);
return load;
}
Map<String, String> properties =
load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties());
StringIdentifier.addToProperties(id, properties);
return new JdbcTable.Builder()
.withAuditInfo(load.auditInfo())
.withName(tableName)
.withColumns(load.columns())
.withComment(load.comment())
.withProperties(properties)
.build();
}

/**
* Apply the {@link TableChange change} to an existing Jdbc table.
*
* @param tableIdent The identifier of the table to alter.
* @param changes The changes to apply to the table.
* @return This method always throws UnsupportedOperationException.
* @return The altered JdbcTable instance representing the table.
* @throws NoSuchTableException This exception will not be thrown in this method.
* @throws IllegalArgumentException This exception will not be thrown in this method.
*/
@Override
public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
throw new UnsupportedOperationException();
Optional<TableChange> renameTableOptional =
Arrays.stream(changes)
.filter(tableChange -> tableChange instanceof TableChange.RenameTable)
.reduce((a, b) -> b);
if (renameTableOptional.isPresent()) {
String otherChange =
Arrays.stream(changes)
.filter(tableChange -> !(tableChange instanceof TableChange.RenameTable))
.map(String::valueOf)
.collect(Collectors.joining("\n"));
Preconditions.checkArgument(
StringUtils.isEmpty(otherChange),
String.format(
"The operation to change the table name cannot be performed together with other operations."
+ "The list of operations that you cannot perform includes: \n %s",
otherChange));
return renameTable(tableIdent, (TableChange.RenameTable) renameTableOptional.get());
}
return internalAlterTable(tableIdent, changes);
}

/**
Expand All @@ -265,7 +316,9 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes)
*/
@Override
public boolean dropTable(NameIdentifier tableIdent) {
throw new UnsupportedOperationException();
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
tableOperation.drop(databaseName, tableIdent.name());
return true;
}

/**
Expand All @@ -290,7 +343,50 @@ public Table createTable(
Distribution distribution,
SortOrder[] sortOrders)
throws NoSuchSchemaException, TableAlreadyExistsException {
throw new UnsupportedOperationException();
Preconditions.checkArgument(
null == distribution || distribution == Distributions.NONE,
"jdbc-catalog does not support distribution");
Preconditions.checkArgument(
null == sortOrders || sortOrders.length == 0, "jdbc-catalog does not support sort orders");

StringIdentifier identifier =
Preconditions.checkNotNull(
StringIdentifier.fromProperties(properties),
"The gravitino id attribute does not exist in properties");
// The properties we write to the database do not require the id field, so it needs to be
// removed.
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
JdbcColumn[] jdbcColumns =
Arrays.stream(columns)
.map(
column ->
new JdbcColumn.Builder()
.withName(column.name())
.withType(column.dataType())
.withComment(column.comment())
.withNullable(column.nullable())
.build())
.toArray(JdbcColumn[]::new);
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
String tableName = tableIdent.name();

tableOperation.create(
databaseName,
tableName,
jdbcColumns,
StringIdentifier.addToComment(identifier, comment),
resultProperties,
partitioning);

return new JdbcTable.Builder()
.withAuditInfo(AuditInfo.EMPTY)
.withName(tableName)
.withColumns(columns)
.withComment(comment)
.withProperties(properties)
.withPartitioning(partitioning)
.build();
}

/**
Expand All @@ -302,7 +398,32 @@ public Table createTable(
*/
@Override
public boolean purgeTable(NameIdentifier tableIdent) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
tableOperation.purge(databaseName, tableIdent.name());
return true;
}

/**
* Perform name change operations on the Jdbc.
*
* @param tableIdent tableIdent of this table.
* @param renameTable Table Change to modify the table name.
* @return Returns the table for Iceberg.
* @throws NoSuchTableException
* @throws IllegalArgumentException
*/
private Table renameTable(NameIdentifier tableIdent, TableChange.RenameTable renameTable)
throws NoSuchTableException, IllegalArgumentException {
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
tableOperation.rename(databaseName, tableIdent.name(), renameTable.getNewName());
return loadTable(NameIdentifier.of(tableIdent.namespace(), renameTable.getNewName()));
}

private Table internalAlterTable(NameIdentifier tableIdent, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name();
tableOperation.alterTable(databaseName, tableIdent.name(), changes);
return loadTable(tableIdent);
}

// TODO. We should figure out a better way to get the current user from servlet container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ public abstract class JdbcTypeConverter {
* @param type
* @return
*/
abstract Type toGravitinoType(String type);
public abstract Type toGravitinoType(String type);

/**
* Convert from JDBC type to Gravitino type
*
* @param type
* @return
*/
abstract String fromGravitinoType(Type type);
public abstract String fromGravitinoType(Type type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.datastrato.gravitino.catalog.jdbc.JdbcSchema;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
Expand All @@ -19,24 +20,23 @@ public interface DatabaseOperation {
*
* @param dataSource The data source to use for the operations.
* @param exceptionMapper The exception mapper to use for the operations.
* @throws RuntimeException
*/
void initialize(final DataSource dataSource, final JdbcExceptionConverter exceptionMapper)
throws RuntimeException;
void initialize(DataSource dataSource, JdbcExceptionConverter exceptionMapper);

/**
* Creates a database with the given name and comment.
*
* @param databaseName The name of the database to create.
* @param comment The comment of the database to create.
* @param databaseName The name of the database.
* @param comment The comment of the database.
*/
void create(String databaseName, String comment, Map<String, String> properties);
void create(String databaseName, String comment, Map<String, String> properties)
throws SchemaAlreadyExistsException;

/**
* @param databaseName The name of the database to check.
* @param cascade If set to true, drops all the tables in the database as well.
*/
void delete(String databaseName, boolean cascade);
void delete(String databaseName, boolean cascade) throws NoSuchSchemaException;

/** @return The list name of databases. */
List<String> list();
Expand Down
Loading

0 comments on commit ad8e98b

Please sign in to comment.