Skip to content

Commit

Permalink
[SPARK-39859][SQL][FOLLOWUP] Only get ColStats when isExtended is tru…
Browse files Browse the repository at this point in the history
…e in Describe Column

### What changes were proposed in this pull request?
get ColStats in `DescribeColumnExec` when `isExtended` is true

### Why are the changes needed?
To make code cleaner

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing test

Closes #40139 from huaxingao/describe_followup.

Authored-by: huaxingao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
huaxingao authored and cloud-fan committed Feb 28, 2023
1 parent a7e61f9 commit cf59158
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Column
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
import org.apache.spark.sql.connector.read.{LocalScan, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -340,14 +340,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
column match {
case c: Attribute =>
val colStats =
r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
case s: SupportsReportStatistics =>
val stats = s.estimateStatistics()
Some(stats.columnStats().get(FieldReference.column(c.name)))
case _ => None
}
DescribeColumnExec(output, c, isExtended, colStats) :: Nil
DescribeColumnExec(output, c, isExtended, r.table) :: Nil
case nested =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(nested))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table}
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.read.SupportsReportStatistics
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class DescribeColumnExec(
override val output: Seq[Attribute],
column: Attribute,
isExtended: Boolean,
colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec {
table: Table) extends LeafV2CommandExec {

override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
Expand All @@ -44,41 +47,54 @@ case class DescribeColumnExec(
CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString)
rows += toCatalystRow("comment", comment)

if (isExtended && colStats.nonEmpty) {
if (colStats.get.min().isPresent) {
rows += toCatalystRow("min", colStats.get.min().toString)
} else {
rows += toCatalystRow("min", "NULL")
if (isExtended) {
val colStats = table match {
case read: SupportsRead =>
read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
case s: SupportsReportStatistics =>
val stats = s.estimateStatistics()
Some(stats.columnStats().get(FieldReference.column(column.name)))
case _ => None
}
case _ => None
}

if (colStats.get.max().isPresent) {
rows += toCatalystRow("max", colStats.get.max().toString)
} else {
rows += toCatalystRow("max", "NULL")
}
if (colStats.nonEmpty) {
if (colStats.get.min().isPresent) {
rows += toCatalystRow("min", colStats.get.min().toString)
} else {
rows += toCatalystRow("min", "NULL")
}

if (colStats.get.nullCount().isPresent) {
rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString)
} else {
rows += toCatalystRow("num_nulls", "NULL")
}
if (colStats.get.max().isPresent) {
rows += toCatalystRow("max", colStats.get.max().toString)
} else {
rows += toCatalystRow("max", "NULL")
}

if (colStats.get.distinctCount().isPresent) {
rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString)
} else {
rows += toCatalystRow("distinct_count", "NULL")
}
if (colStats.get.nullCount().isPresent) {
rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString)
} else {
rows += toCatalystRow("num_nulls", "NULL")
}

if (colStats.get.avgLen().isPresent) {
rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString)
} else {
rows += toCatalystRow("avg_col_len", "NULL")
}
if (colStats.get.distinctCount().isPresent) {
rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString)
} else {
rows += toCatalystRow("distinct_count", "NULL")
}

if (colStats.get.avgLen().isPresent) {
rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString)
} else {
rows += toCatalystRow("avg_col_len", "NULL")
}

if (colStats.get.maxLen().isPresent) {
rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString)
} else {
rows += toCatalystRow("max_col_len", "NULL")
if (colStats.get.maxLen().isPresent) {
rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString)
} else {
rows += toCatalystRow("max_col_len", "NULL")
}
}
}

Expand Down

0 comments on commit cf59158

Please sign in to comment.