Skip to content

[SPARK-16458][SQL] SessionCatalog should support listColumns for temporary tables #14114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -253,9 +253,27 @@ class SessionCatalog(
def getTableMetadata(name: TableIdentifier): CatalogTable = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same thing here - update SessionCatalogSuite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
val tid = TableIdentifier(table)
if (isTemporaryTable(name)) {
CatalogTable(
identifier = tid,
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = tempTables(table).output.map { c =>
CatalogColumn(
name = c.name,
dataType = c.dataType.catalogString,
nullable = c.nullable,
comment = Option(c.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata can contain the comment, but it is a bit of a PITA to get out:
if (c.metadata.contains("comment")) Some(c.metadata.getString("comment")) else None

So I am fine with leaving this as it is...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason we need to put the column name in the comment?

)
},
properties = Map(),
viewText = None)
} else {
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.getTable(db, table)
}
}

/**
Expand Down Expand Up @@ -432,10 +450,10 @@ class SessionCatalog(
def tableExists(name: TableIdentifier): Boolean = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update SessionCatalogSuite to reflect this behavior? I think we weren't checking temp tables in the past.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

val db = formatDatabaseName(name.database.getOrElse(currentDb))
val table = formatTableName(name.table)
if (name.database.isDefined || !tempTables.contains(table)) {
externalCatalog.tableExists(db, table)
if (isTemporaryTable(name)) {
true
} else {
true // it's a temporary table
externalCatalog.tableExists(db, table)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,39 @@ class SessionCatalogSuite extends SparkFunSuite {
assert(catalog.tableExists(TableIdentifier("tbl3")))
}

test("tableExists on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
assert(!catalog.tableExists(TableIdentifier("view1")))
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.tableExists(TableIdentifier("view1")))
assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
}

test("getTableMetadata on temporary views") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
val m = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1"))
}.getMessage
assert(m.contains("Table or view 'view1' not found in database 'default'"))

val m2 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
assert(m2.contains("Table or view 'view1' not found in database 'default'"))

catalog.createTempView("view1", tempTable, overrideIfExists = false)
assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table == "view1")
assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name == "id")

val m3 = intercept[AnalysisException] {
catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
}.getMessage
assert(m3.contains("Table or view 'view1' not found in database 'default'"))
}

test("list tables without pattern") {
val catalog = new SessionCatalog(newBasicCatalog())
val tempTable = Range(1, 10, 2, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ abstract class Catalog {
def listFunctions(dbName: String): Dataset[Function]

/**
* Returns a list of columns for the given table in the current database.
* Returns a list of columns for the given table in the current database or
* the given temporary table.
*
* @since 2.0.0
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("table does not exist")
override def listColumns(tableName: String): Dataset[Column] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the contract of the listColumns(...) function. It now returns either a temporary view or a table in the current database. We have to document this! What happens when we have temporary table with the same name as a table in the current database?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should't we check if the table exists? (like the other listColumns(...) function)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you again, @hvanhovell

  1. Yep. That is the purpose of this PR, to make the contract consistent with other APIs.
  2. The existence checking here is redundant because it call other listColumns. The callee will check that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation in org.apache.spark.sql.catalog.Catalog:

 /**
  * Returns a list of columns for the given table in the current database.
  *
  * @since 2.0.0
  */
  @throws[AnalysisException]("table does not exist")
  def listColumns(tableName: String): Dataset[Column]

Should be updated...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the proper wording, Spark temporary views?

scala> spark.range(10).createOrReplaceTempView("t1")

scala> spark.catalog.listTables().show
+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
|  t1|    null|       null|TEMPORARY|       true|
+----+--------+-----------+---------+-----------+

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 11, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made mistakes too much. Sorry for that.

I will change like the following: This includes all temporary tables.. It's ambiguous, but it is the current docs.

/**
 * Returns a list of tables in the current database.
 * This includes all temporary tables.
 *
 * @since 2.0.0
 */
def listTables(): Dataset[Table]

listColumns(currentDatabase, tableName)
listColumns(TableIdentifier(tableName, None))
}

/**
Expand All @@ -147,7 +147,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
@throws[AnalysisException]("database or table does not exist")
override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
requireTableExists(dbName, tableName)
val tableMetadata = sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName)))
listColumns(TableIdentifier(tableName, Some(dbName)))
}

private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
val columns = tableMetadata.schema.map { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class CatalogSuite
testListColumns("tab1", dbName = None)
}

test("list columns in temporary table") {
createTempTable("temp1")
spark.catalog.listColumns("temp1")
}

test("list columns in database") {
createDatabase("db1")
createTable("tab1", Some("db1"))
Expand Down