Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTa
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, FunctionCatalog, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
import org.apache.spark.sql.connector.catalog.TableChange.{ColumnChange, DeleteColumn, RenameColumn}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -149,26 +150,47 @@ class V2SessionCatalog(catalog: SessionCatalog)
throw QueryCompilationErrors.noSuchTableError(ident)
}

val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
catalogTable.storage.copy(locationUri = location)
} else {
catalogTable.storage

val (columnChanges, otherChanges) = changes.toSeq.partition(change =>
change.isInstanceOf[ColumnChange] &&
// Not supported changes in alterTableDataSchema
!change.isInstanceOf[RenameColumn] && !change.isInstanceOf[DeleteColumn]
Comment on lines +156 to +157
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems awkward. In SessionCatalog I see there is a comment "not supporting dropping columns yet" (emphasis added), should we instead make the change to allow this within alterTableDataSchema? It seems that it can make this much cleaner and have a clear separation of schema changes from alterTableDataSchema vs other changes in alterTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or another somewhat related question, is the right approach to instead make HiveExternalCatalog.alterTable() support modifying the schema? It seems like this would be more in line with other implementations of alterTable. I see this comment was added as part of PR #14155:

   * Note: As of now, this doesn't support altering table schema, partition column names and bucket
   * specification. We will ignore them even if users do specify different values for these fields.

It's not clear to me if this was intentional, or something that was intended to be built on top of / fixed. @cloud-fan , thoughts here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what you say about RenameColumn and DeleteColumn, I am not clear which is the best option, in this case as it is not supported I considered that it was better to keep the current behavior.

)

if(columnChanges.size > 0) {
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, columnChanges, catalogTable.provider, "ALTER TABLE")

try {
catalog.alterTableDataSchema(ident.asTableIdentifier, schema)
} catch {
case _: NoSuchTableException =>
throw QueryCompilationErrors.noSuchTableError(ident)
}
}

try {
catalog.alterTable(
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
throw QueryCompilationErrors.noSuchTableError(ident)
if(otherChanges.size > 0) {
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, otherChanges)
val schema = CatalogV2Util.applySchemaChanges(
catalogTable.schema, otherChanges, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
val storage = if (location.isDefined) {
catalogTable.storage.copy(locationUri = location)
} else {
catalogTable.storage
}

try {
catalog.alterTable(
catalogTable.copy(
properties = properties, schema = schema, owner = owner, comment = comment,
storage = storage))
} catch {
case _: NoSuchTableException =>
throw QueryCompilationErrors.noSuchTableError(ident)
}
}

loadTable(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import java.util.Collections
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfter
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchDatabaseException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIdentifier
Expand All @@ -35,7 +36,9 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeAndAfter {
abstract class V2SessionCatalogBaseSuite extends SparkFunSuite{

protected def spark: SparkSession

val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String]
val schema: StructType = new StructType()
Expand All @@ -55,7 +58,8 @@ abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeA
}
}

class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
abstract class V2SessionCatalogTableBaseSuite extends V2SessionCatalogBaseSuite
with BeforeAndAfterAll with BeforeAndAfter {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

Expand All @@ -64,7 +68,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
val catalog = newCatalog()
catalog.createNamespace(Array("db"), emptyProps)
catalog.createNamespace(Array("db2"),
Map(SupportsNamespaces.PROP_LOCATION -> "file:///db2.db").asJava)
Map(SupportsNamespaces.PROP_LOCATION -> "file:///tmp/db2.db").asJava)
catalog.createNamespace(Array("ns"), emptyProps)
catalog.createNamespace(Array("ns2"), emptyProps)
}
Expand Down Expand Up @@ -815,7 +819,11 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
}
}

class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
class V2SessionCatalogTableSuite extends V2SessionCatalogTableBaseSuite with SharedSparkSession {
}

class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite
with SharedSparkSession {

import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

Expand Down Expand Up @@ -895,7 +903,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
test("createNamespace: basic behavior") {
val catalog = newCatalog()

val sessionCatalog = sqlContext.sessionState.catalog
val sessionCatalog = spark.sqlContext.sessionState.catalog
val expectedPath =
new Path(spark.sessionState.conf.warehousePath,
sessionCatalog.getDefaultDBPath(testNs(0)).toString).toString
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import org.scalactic.source.Position
import org.scalatest.Tag

import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalogTableBaseSuite
import org.apache.spark.sql.hive.test.TestHiveSingleton


class HiveExternalV2SessionCatalogTableSuite extends V2SessionCatalogTableBaseSuite
with TestHiveSingleton {

def excluded: Seq[String] = Seq(
// Not supported in Hive catalog
"alterTable: add nested column",
"createTable: location",
"alterTable: location",

// Not supported in V2SessionCatalog
"alterTable: rename top-level column",
"alterTable: rename nested column",
"alterTable: rename struct column",
"alterTable: multiple changes",
"alterTable: delete top-level column",
"alterTable: delete nested column"
)

override protected def test(testName: String, testTags: Tag*)(testFun: => Any)
(implicit pos: Position): Unit = {
if (excluded.contains(testName)) ()
else super.test(testName, testTags: _*)(testFun)
}
}