Skip to content

Commit

Permalink
[#2401] improvement(spark-connector): support alter namespace operati…
Browse files Browse the repository at this point in the history
…on (#2407)

### What changes were proposed in this pull request?

Support alter namespace operation implementation.

### Why are the changes needed?

Support to set namespace properties in Spark sql like:

```
ALTER DATABASE inventory SET DBPROPERTIES ('Edited-by' = 'John', 'Edit-date' = '01/01/2001');
```

Fix: #2401

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- `SparkIT#testAlterSchema`
  • Loading branch information
SteNicholas authored Mar 6, 2024
1 parent ec2fd57 commit 26911d1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,39 @@ void testCreateAndLoadSchema() {
Assertions.assertTrue(StringUtils.isBlank(properties));

testDatabaseName = "t_create2";
String testDatabaseLocation = "/tmp/" + testDatabaseName;
sql(
String.format(
"CREATE DATABASE %s COMMENT 'comment' LOCATION '/user'\n"
+ " WITH DBPROPERTIES (ID=001);",
testDatabaseName));
"CREATE DATABASE %s COMMENT 'comment' LOCATION '%s'\n" + " WITH DBPROPERTIES (ID=001);",
testDatabaseName, testDatabaseLocation));
databaseMeta = getDatabaseMetadata(testDatabaseName);
String comment = databaseMeta.get("Comment");
Assertions.assertEquals("comment", comment);
Assertions.assertEquals("datastrato", databaseMeta.get("Owner"));
// underlying catalog may change /user to file:/user
Assertions.assertTrue(databaseMeta.get("Location").contains("/user"));
// underlying catalog may change /tmp/t_create2 to file:/tmp/t_create2
Assertions.assertTrue(databaseMeta.get("Location").contains(testDatabaseLocation));
properties = databaseMeta.get("Properties");
Assertions.assertEquals("((ID,001))", properties);
}

@Test
void testAlterSchema() {
String testDatabaseName = "t_alter";
sql("CREATE DATABASE " + testDatabaseName);
Assertions.assertTrue(
StringUtils.isBlank(getDatabaseMetadata(testDatabaseName).get("Properties")));

sql(String.format("ALTER DATABASE %s SET DBPROPERTIES ('ID'='001')", testDatabaseName));
Assertions.assertEquals("((ID,001))", getDatabaseMetadata(testDatabaseName).get("Properties"));

// Hive metastore doesn't support alter database location, therefore this test method
// doesn't verify ALTER DATABASE database_name SET LOCATION 'new_location'.

Assertions.assertThrowsExactly(
NoSuchNamespaceException.class,
() -> sql("ALTER DATABASE notExists SET DBPROPERTIES ('ID'='001')"));
}

@Test
void testDropSchema() {
String testDatabaseName = "t_drop";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.spark.GravitinoSparkConfig;
import com.google.common.base.Preconditions;
import java.util.Arrays;
Expand All @@ -30,6 +31,7 @@
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.NamespaceChange.SetProperty;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand Down Expand Up @@ -175,7 +177,28 @@ public void createNamespace(String[] namespace, Map<String, String> metadata)
@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
throw new NotSupportedException("Doesn't support altering namespace");
validateNamespace(namespace);
SchemaChange[] schemaChanges =
Arrays.stream(changes)
.map(
change -> {
if (change instanceof SetProperty) {
SetProperty setProperty = ((SetProperty) change);
return SchemaChange.setProperty(setProperty.property(), setProperty.value());
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported namespace change %s", change.getClass().getName()));
}
})
.toArray(SchemaChange[]::new);
try {
gravitinoCatalogClient
.asSchemas()
.alterSchema(NameIdentifier.of(metalakeName, catalogName, namespace[0]), schemaChanges);
} catch (NoSuchSchemaException e) {
throw new NoSuchNamespaceException(namespace);
}
}

@Override
Expand Down

0 comments on commit 26911d1

Please sign in to comment.