-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26982][SQL] Enhance describe framework to describe the output of a query. #23883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
dfc3e13
9f6b879
03bf58f
c59c1f6
0853284
b6a13da
e742ae9
cfbe8db
c519fc7
ee858a2
7556506
7d7d7d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,12 +29,12 @@ import org.apache.hadoop.fs.{FileContext, FsConstants, Path} | |
|
||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute} | ||
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute, UnresolvedRelation} | ||
import org.apache.spark.sql.catalyst.catalog._ | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
import org.apache.spark.sql.catalyst.plans.logical.Histogram | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIdentifier} | ||
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} | ||
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat | ||
|
@@ -494,6 +494,34 @@ case class TruncateTableCommand( | |
} | ||
} | ||
|
||
abstract class DescribeCommandBase extends RunnableCommand { | ||
override val output: Seq[Attribute] = Seq( | ||
// Column names are based on Hive. | ||
AttributeReference("col_name", StringType, nullable = false, | ||
new MetadataBuilder().putString("comment", "name of the column").build())(), | ||
AttributeReference("data_type", StringType, nullable = false, | ||
new MetadataBuilder().putString("comment", "data type of the column").build())(), | ||
AttributeReference("comment", StringType, nullable = true, | ||
new MetadataBuilder().putString("comment", "comment of the column").build())() | ||
) | ||
|
||
protected def describeSchema( | ||
schema: StructType, | ||
buffer: ArrayBuffer[Row], | ||
header: Boolean): Unit = { | ||
if (header) { | ||
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) | ||
} | ||
schema.foreach { column => | ||
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) | ||
} | ||
} | ||
|
||
protected def append( | ||
buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = { | ||
buffer += Row(column, dataType, comment) | ||
} | ||
} | ||
/** | ||
* Command that looks like | ||
* {{{ | ||
|
@@ -504,17 +532,7 @@ case class DescribeTableCommand( | |
table: TableIdentifier, | ||
partitionSpec: TablePartitionSpec, | ||
isExtended: Boolean) | ||
extends RunnableCommand { | ||
|
||
override val output: Seq[Attribute] = Seq( | ||
// Column names are based on Hive. | ||
AttributeReference("col_name", StringType, nullable = false, | ||
new MetadataBuilder().putString("comment", "name of the column").build())(), | ||
AttributeReference("data_type", StringType, nullable = false, | ||
new MetadataBuilder().putString("comment", "data type of the column").build())(), | ||
AttributeReference("comment", StringType, nullable = true, | ||
new MetadataBuilder().putString("comment", "comment of the column").build())() | ||
) | ||
extends DescribeCommandBase { | ||
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val result = new ArrayBuffer[Row] | ||
|
@@ -603,22 +621,31 @@ case class DescribeTableCommand( | |
} | ||
table.storage.toLinkedHashMap.foreach(s => append(buffer, s._1, s._2, "")) | ||
} | ||
} | ||
|
||
private def describeSchema( | ||
schema: StructType, | ||
buffer: ArrayBuffer[Row], | ||
header: Boolean): Unit = { | ||
if (header) { | ||
append(buffer, s"# ${output.head.name}", output(1).name, output(2).name) | ||
} | ||
schema.foreach { column => | ||
append(buffer, column.name, column.dataType.simpleString, column.getComment().orNull) | ||
} | ||
} | ||
/** | ||
* Command that looks like | ||
* {{{ | ||
* DESCRIBE [QUERY] statement | ||
* }}} | ||
* | ||
* Parameter 'statement' can be one of the following types : | ||
* 1. SELECT statements | ||
* 2. SELECT statements inside set operators (UNION, INTERSECT etc) | ||
* 3. VALUES statement. | ||
* 4. TABLE statement. Example : TABLE table_name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mean we support There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nvm, just checked and we do support it... |
||
* 5. statements of the form 'FROM table SELECT *' | ||
* | ||
* TODO : support CTEs. | ||
*/ | ||
case class DescribeQueryCommand(query: LogicalPlan) | ||
extends DescribeCommandBase { | ||
|
||
private def append( | ||
buffer: ArrayBuffer[Row], column: String, dataType: String, comment: String): Unit = { | ||
buffer += Row(column, dataType, comment) | ||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val result = new ArrayBuffer[Row] | ||
val queryExecution = sparkSession.sessionState.executePlan(query) | ||
describeSchema(queryExecution.analyzed.schema, result, header = false) | ||
result | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
-- Test tables | ||
CREATE table desc_temp1 (key int COMMENT 'column_comment', val string) USING PARQUET; | ||
CREATE table desc_temp2 (key int, val string) USING PARQUET; | ||
|
||
-- Simple Describe query | ||
DESC SELECT key, key + 1 as plusone FROM desc_temp1; | ||
DESC QUERY SELECT * FROM desc_temp2; | ||
DESC SELECT key, COUNT(*) as count FROM desc_temp1 group by key; | ||
DESC SELECT 10.00D as col1; | ||
DESC QUERY SELECT key FROM desc_temp1 UNION ALL select CAST(1 AS DOUBLE); | ||
DESC QUERY VALUES(1.00D, 'hello') as tab1(col1, col2); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we add |
||
DESC QUERY FROM desc_temp1 a SELECT *; | ||
|
||
|
||
-- Error cases. | ||
DESC WITH s AS (SELECT 'hello' as col1) SELECT * FROM s; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @dilipbiswal . There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dongjoon-hyun Yeah.. Wenchen suggested that we start with simple selects and then improve on it. I am planning to look into CTEs next. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
DESCRIBE QUERY WITH s AS (SELECT * from desc_temp1) SELECT * FROM s; | ||
DESCRIBE INSERT INTO desc_temp1 values (1, 'val1'); | ||
DESCRIBE INSERT INTO desc_temp1 SELECT * FROM desc_temp2; | ||
DESCRIBE | ||
FROM desc_temp1 a | ||
insert into desc_temp1 select * | ||
insert into desc_temp2 select *; | ||
|
||
-- cleanup | ||
DROP TABLE desc_temp1; | ||
DROP TABLE desc_temp2; |
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.