diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index dd76aee2f187..9cdb15092b08 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -309,7 +309,7 @@ class ColumnStatsIndexSupport(spark: SparkSession, colStatsDF.select(targetColumnStatsIndexColumns.map(col): _*) } - private def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { + def loadColumnStatsIndexRecords(targetColumns: Seq[String], shouldReadInMemory: Boolean): HoodieData[HoodieMetadataColumnStats] = { // Read Metadata Table's Column Stats Index records into [[HoodieData]] container by // - Fetching the records from CSI by key-prefixes (encoded column names) // - Extracting [[HoodieMetadataColumnStats]] records diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index ad63ddbb29ee..1a960ecb8fd6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -66,6 +66,7 @@ object HoodieProcedures { ,(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) ,(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder) ,(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder) + ,(ShowMetadataTableColumnStatsProcedure.NAME, ShowMetadataTableColumnStatsProcedure.builder) ,(ShowMetadataTableFilesProcedure.NAME, ShowMetadataTableFilesProcedure.builder) ,(ShowMetadataTablePartitionsProcedure.NAME, ShowMetadataTablePartitionsProcedure.builder) ,(CreateMetadataTableProcedure.NAME, CreateMetadataTableProcedure.builder) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala new file mode 100644 index 000000000000..60aa0f054b9c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowMetadataTableColumnStatsProcedure.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.avro.generic.IndexedRecord +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.avro.model._ +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.table.timeline.{HoodieDefaultTimeline, HoodieInstant} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util +import java.util.function.{Function, Supplier} +import scala.collection.{JavaConversions, mutable} +import scala.jdk.CollectionConverters.{asScalaBufferConverter, asScalaIteratorConverter} + + +class ShowMetadataTableColumnStatsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "partition", DataTypes.StringType), + ProcedureParameter.optional(2, "targetColumns", DataTypes.StringType) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("min_value", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("max_value", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("null_num", DataTypes.LongType, nullable = true, Metadata.empty) + )) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val table = getArgValueOrDefault(args, PARAMETERS(0)) + val partitions = getArgValueOrDefault(args, PARAMETERS(1)).getOrElse("").toString + val partitionsSeq = partitions.split(",").filter(_.nonEmpty).toSeq + + val targetColumns = getArgValueOrDefault(args, PARAMETERS(2)).getOrElse("").toString + val targetColumnsSeq = targetColumns.split(",").toSeq + val basePath = getBasePath(table) + val metadataConfig = HoodieMetadataConfig.newBuilder + .enable(true) + .build + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val schemaUtil = new TableSchemaResolver(metaClient) + val schema = AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema) + val columnStatsIndex = new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient) + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = columnStatsIndex.loadColumnStatsIndexRecords(targetColumnsSeq, shouldReadInMemory = false) + val fsView = buildFileSystemView(table) + val allFileSlices: Set[FileSlice] = { + if (partitionsSeq.isEmpty) { + val engineCtx = new HoodieSparkEngineContext(jsc) + val metaTable = HoodieTableMetadata.create(engineCtx, metadataConfig, basePath) + metaTable.getAllPartitionPaths + .asScala + .flatMap(path => fsView.getLatestFileSlices(path).iterator().asScala) + .toSet + } else { + partitionsSeq + .flatMap(partition => fsView.getLatestFileSlices(partition).iterator().asScala) + .toSet + } + } + + val allFileNames: Set[String] = allFileSlices.map(_.getBaseFile.get().getFileName) + + val rows = mutable.ListBuffer[Row]() + colStatsRecords.collectAsList().asScala + .filter(c => allFileNames.contains(c.getFileName)) + .foreach { c => + rows += Row(c.getFileName, c.getColumnName, getColumnStatsValue(c.getMinValue), + getColumnStatsValue(c.getMaxValue), c.getNullCount.longValue()) + } + + rows.toList + } + + private def getColumnStatsValue(stats_value: Any): String = { + stats_value match { + case _: IntWrapper | + _: BooleanWrapper | + _: DateWrapper | + _: DoubleWrapper | + _: FloatWrapper | + _: LongWrapper | + _: StringWrapper | + _: TimeMicrosWrapper | + _: TimestampMicrosWrapper => + String.valueOf(stats_value.asInstanceOf[IndexedRecord].get(0)) + case _: BytesWrapper => + val bytes_value = stats_value.asInstanceOf[BytesWrapper].getValue + util.Arrays.toString(bytes_value.array()) + case _: DecimalWrapper => + val decimal_value = stats_value.asInstanceOf[DecimalWrapper].getValue + util.Arrays.toString(decimal_value.array()) + case _ => + throw new HoodieException(s"Unsupported type: ${stats_value.getClass.getSimpleName}") + } + } + + def buildFileSystemView(table: Option[Any]): HoodieTableFileSystemView = { + val basePath = getBasePath(table) + val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build + val fs = metaClient.getFs + val globPath = s"$basePath/*/*/*" + val statuses = FSUtils.getGlobStatusExcludingMetaFolder(fs, new Path(globPath)) + + val timeline = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants() + + val maxInstant = metaClient.createNewInstantTime() + val instants = timeline.getInstants.iterator().asScala.filter(_.getTimestamp < maxInstant) + + val details = new Function[HoodieInstant, org.apache.hudi.common.util.Option[Array[Byte]]] + with java.io.Serializable { + override def apply(instant: HoodieInstant): HOption[Array[Byte]] = { + metaClient.getActiveTimeline.getInstantDetails(instant) + } + } + + val filteredTimeline = new HoodieDefaultTimeline( + new java.util.ArrayList[HoodieInstant](JavaConversions.asJavaCollection(instants.toList)).stream(), details) + + new HoodieTableFileSystemView(metaClient, filteredTimeline, statuses.toArray(new Array[FileStatus](statuses.size))) + } + + override def build: Procedure = new ShowMetadataTableColumnStatsProcedure() +} + +object ShowMetadataTableColumnStatsProcedure { + val NAME = "show_metadata_table_column_stats" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new ShowMetadataTableColumnStatsProcedure() + } +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala index c618c227ce1d..b3ce71c70eb9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestMetadataProcedure.scala @@ -91,6 +91,72 @@ class TestMetadataProcedure extends HoodieSparkProcedureTestBase { } } + test("Test Call show_metadata_table_column_stats Procedure") { + withTempDir { tmp => + val tableName = generateTableName + // create table + spark.sql( + s""" + |create table $tableName ( + | c1 int, + | c2 boolean, + | c3 binary, + | c4 date, + | c5 decimal(10,1), + | c6 double, + | c7 float, + | c8 long, + | c9 string, + | c10 timestamp + |) using hudi + | location '${tmp.getCanonicalPath}/$tableName' + | tblproperties ( + | primaryKey = 'c1', + | preCombineField = 'c8', + | hoodie.metadata.enable="true", + | hoodie.metadata.index.column.stats.enable="true" + | ) + """.stripMargin) + // insert data to table + + spark.sql( + s""" + |insert into table $tableName + |values (1, true, CAST('binary data' AS BINARY), CAST('2021-01-01' AS DATE), CAST(10.5 AS DECIMAL(10,1)), CAST(3.14 AS DOUBLE), CAST(2.5 AS FLOAT), 1000, 'example string', CAST('2021-01-01 00:00:00' AS TIMESTAMP)) + |""".stripMargin) + spark.sql( + s""" + |insert into table $tableName + |values (10, false, CAST('binary data' AS BINARY), CAST('2022-02-02' AS DATE), CAST(20.5 AS DECIMAL(10,1)), CAST(6.28 AS DOUBLE), CAST(3.14 AS FLOAT), 2000, 'another string', CAST('2022-02-02 00:00:00' AS TIMESTAMP)) + |""".stripMargin) + + // Only numerical and string types are compared for clarity on min/max values. + val expectedValues = Map( + 1 -> ("1", "10"), + 2 -> ("false", "true"), + 6 -> ("3.14", "6.28"), + 7 -> ("2.5", "3.14"), + 8 -> ("1000", "2000"), + 9 -> ("another string", "example string") + ) + + for (i <- 1 to 10) { + val columnName = s"c$i" + val metadataStats = spark.sql(s"""call show_metadata_table_column_stats(table => '$tableName', targetColumns => '$columnName')""").collect() + assertResult(1)(metadataStats.length) + val minVal: String = metadataStats(0).getAs[String]("min_value") + val maxVal: String = metadataStats(0).getAs[String]("max_value") + + expectedValues.get(i) match { + case Some((expectedMin, expectedMax)) => + assertResult(expectedMin)(minVal) + assertResult(expectedMax)(maxVal) + case None => // Do nothing if no expected values found + } + } + } + } + test("Test Call show_metadata_table_stats Procedure") { withTempDir { tmp => val tableName = generateTableName