Skip to content

Commit dd1d52b

Browse files
committed
Support define owner for datasource api
1 parent 25cf227 commit dd1d52b

File tree

8 files changed

+84
-34
lines changed

8 files changed

+84
-34
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.apache.spark.sql.connector.catalog;
2+
3+
/**
4+
* An enumeration support for Role-Based Access Control(RBAC) extensions.
5+
*/
6+
public enum PrincipalType {
7+
USER,
8+
GROUP,
9+
ROLE
10+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.spark.annotation.Experimental;
2121
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
2222
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
23+
import org.apache.spark.util.Utils;
2324

2425
import java.util.Arrays;
2526
import java.util.List;
@@ -64,9 +65,7 @@ public interface SupportsNamespaces extends CatalogPlugin {
6465
String PROP_OWNER_TYPE = "ownerType";
6566

6667
/**
67-
* The list of namespace ownership properties, cannot be used in `CREATE` syntax.
68-
*
69-
* Only support in:
68+
* The list of namespace ownership properties, which can be used in `ALTER` syntax:
7069
*
7170
* {{
7271
* ALTER (DATABASE|SCHEMA|NAMESPACE) SET OWNER ...
@@ -86,6 +85,18 @@ public interface SupportsNamespaces extends CatalogPlugin {
8685
List<String> REVERSED_PROPERTIES =
8786
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_OWNER_NAME, PROP_OWNER_TYPE);
8887

88+
/**
89+
* Specify the default owner name for `CREATE` namespace.
90+
*
91+
*/
92+
default String defaultOwner() { return Utils.getCurrentUserName(); }
93+
94+
/**
95+
*
96+
* Specify the default owner type for `CREATE` namespace.
97+
*/
98+
default String defaultOwnerType() { return PrincipalType.USER.name(); }
99+
89100
/**
90101
* Return a default namespace for the catalog.
91102
* <p>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
3939
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
4040
import org.apache.spark.sql.catalyst.util.IntervalUtils
4141
import org.apache.spark.sql.catalyst.util.IntervalUtils.IntervalUnit
42-
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
42+
import org.apache.spark.sql.connector.catalog.{PrincipalType, SupportsNamespaces}
4343
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
4444
import org.apache.spark.sql.internal.SQLConf
4545
import org.apache.spark.sql.types._
@@ -2589,10 +2589,17 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
25892589
*/
25902590
override def visitSetNamespaceOwner(ctx: SetNamespaceOwnerContext): LogicalPlan = {
25912591
withOrigin(ctx) {
2592-
AlterNamespaceSetOwner(
2593-
visitMultipartIdentifier(ctx.multipartIdentifier),
2594-
ctx.identifier.getText,
2595-
ctx.ownerType.getText)
2592+
val ownerType = ctx.ownerType.getText
2593+
try {
2594+
AlterNamespaceSetOwner(
2595+
visitMultipartIdentifier(ctx.multipartIdentifier),
2596+
ctx.identifier.getText,
2597+
PrincipalType.valueOf(ownerType).name)
2598+
} catch {
2599+
case _: IllegalArgumentException =>
2600+
throw new ParseException(s"$ownerType is not supported, need to be one of" +
2601+
s" ${PrincipalType.values().mkString("[", ",", "]")}", ctx)
2602+
}
25962603
}
25972604
}
25982605

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateNamespaceExec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
2222
import org.apache.spark.sql.catalyst.InternalRow
2323
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException
2424
import org.apache.spark.sql.catalyst.expressions.Attribute
25-
import org.apache.spark.sql.connector.catalog.SupportsNamespaces
25+
import org.apache.spark.sql.connector.catalog.{PrincipalType, SupportsNamespaces}
2626

2727
/**
2828
* Physical plan node for creating a namespace.
@@ -35,11 +35,14 @@ case class CreateNamespaceExec(
3535
extends V2CommandExec {
3636
override protected def run(): Seq[InternalRow] = {
3737
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
38+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
3839

3940
val ns = namespace.toArray
4041
if (!catalog.namespaceExists(ns)) {
4142
try {
42-
catalog.createNamespace(ns, properties.asJava)
43+
val ownership = Map(PROP_OWNER_NAME -> catalog.defaultOwner(),
44+
PROP_OWNER_TYPE -> PrincipalType.valueOf(catalog.defaultOwnerType).name())
45+
catalog.createNamespace(ns, (properties ++ ownership).asJava)
4346
} catch {
4447
case _: NamespaceAlreadyExistsException if ifNotExists =>
4548
logWarning(s"Namespace ${namespace.quoted} was created concurrently. Ignoring.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeNamespaceExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ case class DescribeNamespaceExec(
4545
rows += toCatalystRow("Namespace Name", ns.last)
4646
rows += toCatalystRow("Description", metadata.get(PROP_COMMENT))
4747
rows += toCatalystRow("Location", metadata.get(PROP_LOCATION))
48-
rows += toCatalystRow("Owner Name", metadata.get(PROP_OWNER_NAME))
49-
rows += toCatalystRow("Owner Type", metadata.get(PROP_OWNER_TYPE))
48+
rows += toCatalystRow("Owner Name", metadata.getOrDefault(PROP_OWNER_NAME, ""))
49+
rows += toCatalystRow("Owner Type", metadata.getOrDefault(PROP_OWNER_TYPE, ""))
5050

5151
if (isExtended) {
5252
val properties = metadata.asScala -- REVERSED_PROPERTIES.asScala
5353
if (properties.nonEmpty) {
54-
rows += toCatalystRow("Properties", properties.mkString("(", ",", ")"))
54+
rows += toCatalystRow("Properties", properties.toSeq.mkString("(", ",", ")"))
5555
}
5656
}
5757
rows

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,15 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
226226
namespace match {
227227
case Array(db) =>
228228
val metadata = catalog.getDatabaseMetadata(db).toMetadata
229+
// validate that this catalog's reserved properties are not removed
230+
changes.foreach {
231+
case remove: RemoveProperty
232+
if SupportsNamespaces.REVERSED_PROPERTIES.contains(remove.property) =>
233+
throw new UnsupportedOperationException(
234+
s"Cannot remove reserved property: ${remove.property}")
235+
case _ =>
236+
}
237+
229238
catalog.alterDatabase(
230239
toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, changes)))
231240

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
2929
import org.apache.spark.sql.sources.SimpleScanSource
3030
import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType}
3131
import org.apache.spark.sql.util.CaseInsensitiveStringMap
32+
import org.apache.spark.util.Utils
3233

3334
class DataSourceV2SQLSuite
3435
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = true)
@@ -933,7 +934,9 @@ class DataSourceV2SQLSuite
933934
assert(description === Seq(
934935
Row("Namespace Name", "ns2"),
935936
Row("Description", "test namespace"),
936-
Row("Location", "/tmp/ns_test")
937+
Row("Location", "/tmp/ns_test"),
938+
Row("Owner Name", Utils.getCurrentUserName()),
939+
Row("Owner Type", PrincipalType.USER.name())
937940
))
938941
}
939942
}
@@ -948,6 +951,8 @@ class DataSourceV2SQLSuite
948951
Row("Namespace Name", "ns2"),
949952
Row("Description", "test namespace"),
950953
Row("Location", "/tmp/ns_test"),
954+
Row("Owner Name", Utils.getCurrentUserName()),
955+
Row("Owner Type", PrincipalType.USER.name()),
951956
Row("Properties", "((a,b),(b,a),(c,c))")
952957
))
953958
}
@@ -962,7 +967,25 @@ class DataSourceV2SQLSuite
962967
assert(descriptionDf.collect() === Seq(
963968
Row("Namespace Name", "ns2"),
964969
Row("Description", "test namespace"),
965-
Row("Location", "/tmp/ns_test_2")
970+
Row("Location", "/tmp/ns_test_2"),
971+
Row("Owner Name", Utils.getCurrentUserName()),
972+
Row("Owner Type", PrincipalType.USER.name())
973+
))
974+
}
975+
}
976+
977+
test("AlterNamespaceSetOwner using v2 catalog") {
978+
withNamespace("testcat.ns1.ns2") {
979+
sql("CREATE NAMESPACE IF NOT EXISTS testcat.ns1.ns2 COMMENT " +
980+
"'test namespace' LOCATION '/tmp/ns_test_3'")
981+
sql("ALTER NAMESPACE testcat.ns1.ns2 SET OWNER ROLE adminRole")
982+
val descriptionDf = sql("DESCRIBE NAMESPACE EXTENDED testcat.ns1.ns2")
983+
assert(descriptionDf.collect() === Seq(
984+
Row("Namespace Name", "ns2"),
985+
Row("Description", "test namespace"),
986+
Row("Location", "/tmp/ns_test_3"),
987+
Row("Owner Name", "adminRole"),
988+
Row("Owner Type", PrincipalType.ROLE.name())
966989
))
967990
}
968991
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1010,31 +1010,18 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
10101010
assert(exc.getMessage.contains(testNs.quoted))
10111011
}
10121012

1013-
test("alterNamespace: fail to remove location") {
1013+
test("alterNamespace: fail to remove reversed properties") {
10141014
val catalog = newCatalog()
10151015

10161016
catalog.createNamespace(testNs, emptyProps)
10171017

1018-
val exc = intercept[UnsupportedOperationException] {
1019-
catalog.alterNamespace(testNs, NamespaceChange.removeProperty("location"))
1020-
}
1021-
1022-
assert(exc.getMessage.contains("Cannot remove reserved property: location"))
1023-
1024-
catalog.dropNamespace(testNs)
1025-
}
1018+
SupportsNamespaces.REVERSED_PROPERTIES.asScala.foreach { p =>
1019+
val exc = intercept[UnsupportedOperationException] {
1020+
catalog.alterNamespace(testNs, NamespaceChange.removeProperty(p))
1021+
}
1022+
assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))
10261023

1027-
test("alterNamespace: fail to remove comment") {
1028-
val catalog = newCatalog()
1029-
1030-
catalog.createNamespace(testNs, Map("comment" -> "test db").asJava)
1031-
1032-
val exc = intercept[UnsupportedOperationException] {
1033-
catalog.alterNamespace(testNs, NamespaceChange.removeProperty("comment"))
10341024
}
1035-
1036-
assert(exc.getMessage.contains("Cannot remove reserved property: comment"))
1037-
10381025
catalog.dropNamespace(testNs)
10391026
}
10401027
}

0 commit comments

Comments
 (0)