From 79b5fd3c5f05f92374b6acc7161a69bce9f82939 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Tue, 7 Mar 2023 07:58:51 -0800 Subject: [PATCH] add statistics Signed-off-by: Peng Huo --- .../sql/DeltaSparkSessionExtension.scala | 12 +- .../spark/sql/delta/DeltaColumnMapping.scala | 72 ++ .../apache/spark/sql/delta/DeltaErrors.scala | 42 +- .../spark/sql/delta/DeltaOperations.scala | 12 +- .../org/apache/spark/sql/delta/DeltaUDF.scala | 44 + .../sql/delta/OptimisticTransaction.scala | 84 +- .../spark/sql/delta/PartitionFiltering.scala | 2 +- .../org/apache/spark/sql/delta/Snapshot.scala | 11 +- .../commands/CreateDeltaTableCommand.scala | 71 +- .../spark/sql/delta/implicits/package.scala | 114 +++ .../sql/delta/metering/DeltaLogging.scala | 10 + .../sql/delta/schema/SchemaMergingUtils.scala | 90 ++ .../stats/DataSkippingPredicateBuilder.scala | 109 ++ .../sql/delta/stats/DataSkippingReader.scala | 929 ++++++++++++++++++ .../spark/sql/delta/stats/DeltaScan.scala | 62 +- .../sql/delta/stats/DeltaScanGenerator.scala | 39 + .../sql/delta/stats/PrepareDeltaScan.scala | 325 ++++++ .../delta/stats/StatisticsCollection.scala | 298 ++++++ .../spark/sql/delta/stats/StatsProvider.scala | 104 ++ .../sql/delta/stats/UsesMetadataFields.scala | 44 + 20 files changed, 2367 insertions(+), 107 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingPredicateBuilder.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScanGenerator.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/StatsProvider.scala create mode 100644 core/src/main/scala/org/apache/spark/sql/delta/stats/UsesMetadataFields.scala diff --git a/core/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala b/core/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala index 55509696a66..d6c66273109 100644 --- a/core/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala +++ b/core/src/main/scala/io/delta/sql/DeltaSparkSessionExtension.scala @@ -16,11 +16,10 @@ package io.delta.sql -import org.apache.spark.sql.delta._ import io.delta.sql.parser.DeltaSqlParser - import org.apache.spark.sql.SparkSessionExtensions -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.stats.PrepareDeltaScan /** * An extension for Spark SQL to activate Delta SQL parser to support Delta SQL grammar. @@ -90,8 +89,11 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) { extensions.injectPostHocResolutionRule { session => new PreprocessTableDelete(session.sessionState.conf) } - extensions.injectOptimizerRule { session => - new ActiveOptimisticTransactionRule(session) + // We don't use `injectOptimizerRule` here as we won't want to apply further optimizations after + // `PrepareDeltaScan`. + // For example, `ConstantFolding` will break unit tests in `OptimizeGeneratedColumnSuite`. + extensions.injectPreCBORule { session => + new PrepareDeltaScan(session) } } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala new file mode 100644 index 00000000000..6aa5cf824da --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -0,0 +1,72 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.types.StructField + +import java.util.{Locale, UUID} + +trait DeltaColumnMappingBase extends DeltaLogging { + val MIN_WRITER_VERSION = 5 + val MIN_READER_VERSION = 2 + val MIN_PROTOCOL_VERSION = Protocol(MIN_READER_VERSION, MIN_WRITER_VERSION) + + val PARQUET_FIELD_ID_METADATA_KEY = "parquet.field.id" + val COLUMN_MAPPING_METADATA_PREFIX = "delta.columnMapping." + val COLUMN_MAPPING_METADATA_ID_KEY = COLUMN_MAPPING_METADATA_PREFIX + "id" + val COLUMN_MAPPING_PHYSICAL_NAME_KEY = COLUMN_MAPPING_METADATA_PREFIX + "physicalName" + + def generatePhysicalName: String = "col-" + UUID.randomUUID() + + def getPhysicalName(field: StructField): String = { + if (field.metadata.contains(COLUMN_MAPPING_PHYSICAL_NAME_KEY)) { + field.metadata.getString(COLUMN_MAPPING_PHYSICAL_NAME_KEY) + } else { + field.name + } + } +} + +object DeltaColumnMapping extends DeltaColumnMappingBase + +/** + * A trait for Delta column mapping modes. + */ +sealed trait DeltaColumnMappingMode { + def name: String +} + +/** + * No mapping mode uses a column's display name as its true identifier to + * read and write data. + * + * This is the default mode and is the same mode as Delta always has been. + */ +case object NoMapping extends DeltaColumnMappingMode { + val name = "none" +} + +object DeltaColumnMappingMode { + def apply(name: String): DeltaColumnMappingMode = { + name.toLowerCase(Locale.ROOT) match { + case NoMapping.name => NoMapping + case mode => throw DeltaErrors.unsupportedColumnMappingMode(mode) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index b68fb7df928..38ac6b77bb0 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine -import java.io.{FileNotFoundException, IOException} -import java.util.ConcurrentModificationException - -import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol} +import io.delta.sql.DeltaSparkSessionExtension +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata} import org.apache.spark.sql.delta.catalog.DeltaCatalog import org.apache.spark.sql.delta.constraints.Constraints import org.apache.spark.sql.delta.hooks.PostCommitHook @@ -28,20 +31,14 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{InvariantViolationException, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.JsonUtils -import io.delta.sql.DeltaSparkSessionExtension -import org.apache.hadoop.fs.Path - -import org.apache.spark.{SparkConf, SparkEnv} -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.{SparkConf, SparkEnv} + +import java.io.{FileNotFoundException, IOException} +import java.util.ConcurrentModificationException trait DocsPath { @@ -122,6 +119,14 @@ object DeltaErrors |get deleted based on retention settings. """.stripMargin + /** + * We have plans to support more column mapping modes, but they are not implemented yet, + * so we error for now to be forward compatible with tables created in the future. + */ + def unsupportedColumnMappingMode(mode: String): Throwable = + new ColumnMappingUnsupportedException(s"The column mapping mode `$mode` is " + + s"not supported for this Delta version. Please upgrade if you want to use this mode.") + def deltaSourceIgnoreDeleteError(version: Long, removedFile: String): Throwable = { new UnsupportedOperationException( s"Detected deleted data (for example $removedFile) from streaming source at " + @@ -1400,3 +1405,10 @@ class MetadataMismatchErrorBuilder { throw new AnalysisException(bits.mkString("\n")) } } + + +/** Errors thrown around column mapping. */ +class ColumnMappingUnsupportedException(msg: String) + extends UnsupportedOperationException(msg) +case class ColumnMappingException(msg: String, mode: DeltaColumnMappingMode) + extends AnalysisException(msg) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 767072b0095..d3a6c4f3aa6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -16,14 +16,11 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.actions.{Metadata, Protocol} -import org.apache.spark.sql.delta.constraints.Constraint -import org.apache.spark.sql.delta.util.JsonUtils - import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause +import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.util.JsonUtils import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{StructField, StructType} @@ -316,6 +313,11 @@ object DeltaOperations { } } + /** Recorded when recomputing stats on the table. */ + case class ComputeStats(predicate: Seq[String]) extends Operation("COMPUTE STATS") { + override val parameters: Map[String, Any] = Map( + "predicate" -> JsonUtils.toJson(predicate)) + } private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = { Map( diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala new file mode 100644 index 00000000000..c2aaede2bce --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala @@ -0,0 +1,44 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction} +import org.apache.spark.sql.functions.udf + +object DeltaUDF { + + /** + * A template for String => String udfs. It's used to create `SparkUserDefinedFunction` for + * String => String udfs without touching Scala Reflection to reduce the log contention. + */ + private lazy val stringStringUdfTemplate = + udf[String, String]((x: String) => x).asInstanceOf[SparkUserDefinedFunction] + + private def createUdfFromTemplate[R, T]( + template: SparkUserDefinedFunction, + f: T => R): UserDefinedFunction = { + template.copy( + f = f, + inputEncoders = template.inputEncoders.map(_.map(_.copy())), + outputEncoder = template.outputEncoder.map(_.copy()) + ) + } + + def stringStringUdf(f: String => String): UserDefinedFunction = { + udf[String, String](f) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 2f56ac0d0a2..2b771a6fc22 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -16,16 +16,9 @@ package org.apache.spark.sql.delta -import java.net.URI -import java.nio.file.FileAlreadyExistsException -import java.util.{ConcurrentModificationException, Locale} -import java.util.concurrent.TimeUnit.NANOSECONDS - -import scala.collection.mutable -import scala.collection.mutable.{ArrayBuffer, HashSet} -import scala.util.control.NonFatal - import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.delta.DeltaOperations.Operation import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.files._ @@ -33,14 +26,17 @@ import org.apache.spark.sql.delta.hooks.{GenerateSymlinkManifest, PostCommitHook import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.hadoop.fs.Path - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter +import org.apache.spark.sql.delta.stats.{DeltaScan, DeltaScanGenerator} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} import org.apache.spark.util.{Clock, Utils} +import java.nio.file.FileAlreadyExistsException +import java.util.ConcurrentModificationException +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.collection.mutable +import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.util.control.NonFatal + /** Record metrics about a successful commit. */ case class CommitStats( /** The version read by the txn when it starts. */ @@ -152,6 +148,7 @@ object OptimisticTransaction { * This trait is not thread-safe. */ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReporting + with DeltaScanGenerator with DeltaLogging { import org.apache.spark.sql.delta.util.FileNames._ @@ -187,6 +184,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport /** Stores the updated protocol (if any) that will result from this txn. */ protected var newProtocol: Option[Protocol] = None + override val snapshotToScan: Snapshot = snapshot + /** + * Tracks the first-access snapshots of other Delta logs read by this transaction. + * The snapshots are keyed by the log's unique id. + */ + protected var readSnapshots = new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot] + protected val txnStartNano = System.nanoTime() protected var commitStartNano = -1L protected var commitInfo: CommitInfo = _ @@ -218,6 +222,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport /** Start time of txn in nanoseconds */ def txnStartTimeNs: Long = txnStartNano + /** Gets the stats collector for the table at the snapshot this transaction has. */ + def statsCollector: Column = snapshot.statsCollector + /** * Returns the metadata for this transaction. The metadata refers to the metadata of the snapshot * at the transaction's read version unless updated during the transaction. @@ -363,6 +370,48 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport } } + /** + * Returns the [[DeltaScanGenerator]] for the given log, which will be used to generate + * [[DeltaScan]]s. Every time this method is called on a log, the returned generator + * generator will read a snapshot that is pinned on the first access for that log. + * + * Internally, if the given log is the same as the log associated with this + * transaction, then it returns this transaction, otherwise it will return a snapshot of + * given log + */ + def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = { + if (index.deltaLog.isSameLogAs(deltaLog)) { + this + } else { + if (spark.conf.get(DeltaSQLConf.DELTA_SNAPSHOT_ISOLATION)) { + readSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => { + // Will be called only when the log is accessed the first time + index.getSnapshot + }) + } else { + index.getSnapshot + } + } + } + + /** Returns a[[DeltaScan]] based on the given filters and projections. */ + override def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = { + val scan = snapshot.filesForScan(projection, filters) + val partitionFilters = filters.filter { f => + DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark) + } + readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true)) + readFiles ++= scan.files + scan + } + + override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = { + val metadata = snapshot.filesWithStatsForScan(partitionFilters) + readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true)) + withFilesRead(filterFiles(partitionFilters)) + metadata + } + /** Returns files matching the given predicates. */ def filterFiles(): Seq[AddFile] = filterFiles(Seq(Literal.TrueLiteral)) @@ -383,6 +432,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport readTheWholeTable = true } + /** Mark the given files as read within this transaction. */ + def withFilesRead(files: Seq[AddFile]): Unit = { + readFiles ++= files + } + /** * Returns the latest version that has committed for the idempotent transaction with given `id`. */ diff --git a/core/src/main/scala/org/apache/spark/sql/delta/PartitionFiltering.scala b/core/src/main/scala/org/apache/spark/sql/delta/PartitionFiltering.scala index 48e55e5f905..84571c1c79a 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/PartitionFiltering.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/PartitionFiltering.scala @@ -38,6 +38,6 @@ trait PartitionFiltering { self: Snapshot => partitionFilters).as[AddFile].collect() } - DeltaScan(version = version, files, null, null, null)(null, null, null, null) + DeltaScan(version = version, files, null, null, null)(null, null, null, null, 0, null) } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 042833ff958..33a6acde402 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -18,9 +18,7 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import java.net.URI - import scala.collection.mutable - import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.actions.Action.logSchema import org.apache.spark.sql.delta.metering.DeltaLogging @@ -28,9 +26,9 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.StateCache import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.delta.stats.{DataSkippingReader, StatisticsCollection} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions._ @@ -63,6 +61,8 @@ class Snapshot( extends StateCache with PartitionFiltering with DeltaFileFormat + with StatisticsCollection + with DataSkippingReader with DeltaLogging { import Snapshot._ @@ -71,6 +71,8 @@ class Snapshot( protected def spark = SparkSession.active + /** Snapshot to scan by the DeltaScanGenerator for metadata query optimizations */ + override val snapshotToScan: Snapshot = this protected def getNumPartitions: Int = { spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_PARTITIONS) @@ -206,6 +208,9 @@ class Snapshot( /** Returns the schema of the table. */ def schema: StructType = metadata.schema + /** Returns the data schema of the table, used for reading stats */ + def tableDataSchema: StructType = metadata.dataSchema + /** Returns the data schema of the table, the schema of the columns written out to file. */ def dataSchema: StructType = metadata.dataSchema diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index dde9733dc01..55b31298a4e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.Attribute @@ -31,8 +30,9 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{DeltaFileOperations, SerializableFileStatus} import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.functions.input_file_name +import org.apache.spark.sql.functions.{input_file_name, to_json} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, _} import org.apache.spark.util.SerializableConfiguration import java.io.Closeable @@ -195,10 +195,7 @@ case class CreateDeltaTableCommand( val convertTarget = ConvertTarget(table, tableLocation, Map.empty[String, String]) if ("false".equalsIgnoreCase(table.properties.getOrElse("auto_refresh", "false"))) { - logWarning(s"one time refresh") - deltaLog.withNewTransaction { t => - performConvert(sparkSession, t, convertTarget, Option.empty) - } + logError(s"one time refresh is not supported") } else { logWarning(s"auto refresh") autoRefresh(sparkSession, deltaLog, table, convertTarget) @@ -465,25 +462,19 @@ case class CreateDeltaTableCommand( def myFunc(batchDF: DataFrame, batchID: Long): Unit = { deltaLog.withNewTransaction { txn => { logWarning(s"=== Refresh with files ===") - val paths = batchDF.collect().map(row => { - val path = row(0).toString - logWarning(s"New file: $path") - new Path(path) - }) - performConvert(spark, txn, convertProperties, Option(paths)) + performConvert(spark, txn, convertProperties, batchDF) } } - /** * TODO: refresh all index/MV in current Delta table metadata */ - logWarning("=== Refreshing index ===") - val tableName = table.qualifiedName - val indexes = spark.sql(s"SHOW INDEXES ON $tableName") - for (index <- indexes.select("Name").collect.map(_(0))) { - logWarning(s"Index: $index") - spark.sql(s"REFRESH INDEX $index ON $tableName") - } +// logWarning("=== Refreshing index ===") +// val tableName = table.qualifiedName +// val indexes = spark.sql(s"SHOW INDEXES ON $tableName") +// for (index <- indexes.select("Name").collect.map(_(0))) { +// logWarning(s"Index: $index") +// spark.sql(s"REFRESH INDEX $index ON $tableName") +// } } val streamDF = spark.readStream @@ -492,7 +483,6 @@ case class CreateDeltaTableCommand( .option("path", convertProperties.targetDir.toString) .load() streamDF - .select(input_file_name()) .writeStream .foreachBatch(myFunc _) .start() @@ -503,17 +493,20 @@ case class CreateDeltaTableCommand( * entire list of files. */ protected def createDeltaActions( - spark: SparkSession, - manifest: FileManifest, + df: DataFrame, txn: OptimisticTransaction, fs: FileSystem ): Iterator[AddFile] = { - val statsBatchSize = - conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_STATS_COLLECTION) - manifest.getFiles.grouped(statsBatchSize).flatMap { batch => - val adds = batch.map(createAddFile(_, txn.deltaLog.dataPath, fs)) - adds.toIterator - } + val filesWithStats = df.groupBy(input_file_name()) + .agg(to_json(txn.snapshot.statsCollector)) + filesWithStats.collect().iterator.map { row => { + val fileName = row.getString(0) + logWarning(s"New file: $fileName") + val addFile = createAddFile( + SerializableFileStatus.fromStatus(fs.getFileStatus(new Path(fileName))), + txn.deltaLog.dataPath, fs) + addFile.copy(stats = row.getString(1)) + }} } /** @@ -528,7 +521,7 @@ case class CreateDeltaTableCommand( spark: SparkSession, txn: OptimisticTransaction, convertProperties: ConvertTarget, - files: Option[Array[Path]] + df: DataFrame ): Seq[Row] = recordDeltaOperation(txn.deltaLog, "delta.convert") { @@ -541,21 +534,11 @@ case class CreateDeltaTableCommand( if (!fs.exists(qualifiedPath)) { throw DeltaErrors.pathNotExistsException(qualifiedDir) } - val serializableConfiguration = - new SerializableConfiguration(sessionHadoopConf) - - val manifest = files.map(new ProvidedFileManifest(qualifiedDir, fs, _)) - .getOrElse(new ManualListingFileManifest(spark, qualifiedDir, serializableConfiguration)) try { - val initialList = manifest.getFiles - if (!initialList.hasNext) { - throw DeltaErrors.emptyDirectoryException(qualifiedDir) - } - - val numFiles = initialList.size + val numFiles = 0 - val addFilesIter = createDeltaActions(spark, manifest, txn, fs) + val addFilesIter = createDeltaActions(df, txn, fs) val metrics = Map[String, String]( "numConvertedFiles" -> numFiles.toString ) @@ -564,12 +547,12 @@ case class CreateDeltaTableCommand( spark, txn, Iterator.single(txn.protocol) ++ addFilesIter, - getOperation(numFiles, convertProperties), + getOperation(0, convertProperties), getContext, metrics ) } finally { - manifest.close() + // do nothing } Seq.empty[Row] diff --git a/core/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala b/core/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala new file mode 100644 index 00000000000..0b081948c7c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/implicits/package.scala @@ -0,0 +1,114 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} + +package object implicits { + + /** + * This implicit class is used to provide helpful methods used throughout the code that are not + * provided by Spark-Catalyst's StructType. + */ + implicit class RichStructType(structType: StructType) { + + /** + * Returns a field in this struct and its child structs, case insensitively. + * + * If includeCollections is true, this will return fields that are nested in maps and arrays. + * + * @param fieldNames The path to the field, in order from the root. For example, the column + * nested.a.b.c would be Seq("nested", "a", "b", "c"). + */ + def findNestedFieldIgnoreCase( + fieldNames: Seq[String], + includeCollections: Boolean = false): Option[StructField] = { + val fieldOption = fieldNames.headOption.flatMap { + fieldName => structType.find(_.name.equalsIgnoreCase(fieldName)) + } + fieldOption match { + case Some(field) => + (fieldNames.tail, field.dataType, includeCollections) match { + case (Seq(), _, _) => + Some(field) + + case (names, struct: StructType, _) => + struct.findNestedFieldIgnoreCase(names, includeCollections) + + case (_, _, false) => + None // types nested in maps and arrays are not used + + case (Seq("key"), MapType(keyType, _, _), true) => + // return the key type as a struct field to include nullability + Some(StructField("key", keyType, nullable = false)) + + case (Seq("key", names @ _*), MapType(struct: StructType, _, _), true) => + struct.findNestedFieldIgnoreCase(names, includeCollections) + + case (Seq("value"), MapType(_, valueType, isNullable), true) => + // return the value type as a struct field to include nullability + Some(StructField("value", valueType, nullable = isNullable)) + + case (Seq("value", names @ _*), MapType(_, struct: StructType, _), true) => + struct.findNestedFieldIgnoreCase(names, includeCollections) + + case (Seq("element"), ArrayType(elementType, isNullable), true) => + // return the element type as a struct field to include nullability + Some(StructField("element", elementType, nullable = isNullable)) + + case (Seq("element", names @ _*), ArrayType(struct: StructType, _), true) => + struct.findNestedFieldIgnoreCase(names, includeCollections) + + case _ => + None + } + case _ => + None + } + } + } + +// /** +// * This implicit class is used to provide helpful methods used throughout the code that are not +// * provided by Spark-Catalyst's LogicalPlan. +// */ +// implicit class RichLogicalPlan(plan: LogicalPlan) { +// /** +// * Returns the result of running QueryPlan.transformExpressionsUpWithPruning on this node +// * and all its children. +// */ +// def transformAllExpressionsUpWithPruning( +// cond: TreePatternBits => Boolean, +// ruleId: RuleId = UnknownRuleId)( +// rule: PartialFunction[Expression, Expression] +// ): LogicalPlan = { +// plan.transformUpWithPruning(cond, ruleId) { +// case q: QueryPlan[_] => +// q.transformExpressionsUpWithPruning(cond, ruleId)(rule) +// } +// } +// +// /** +// * Returns the result of running QueryPlan.transformExpressionsUp on this node +// * and all its children. +// */ +// def transformAllExpressionsUp( +// rule: PartialFunction[Expression, Expression]): LogicalPlan = { +// transformAllExpressionsUpWithPruning(AlwaysProcess.fn, UnknownRuleId)(rule) +// } +// } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala b/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala index 727b8377382..8d003c8be4c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/metering/DeltaLogging.scala @@ -106,4 +106,14 @@ trait DeltaLogging thunk } } + + protected def recordFrameProfile[T](group: String, name: String)(thunk: => T): T = { + // future work to capture runtime information ... + thunk + } + + protected def withDmqTag[T](thunk: => T): T = { + thunk + } + } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala new file mode 100644 index 00000000000..e1ed6164488 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -0,0 +1,90 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.schema + +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.types._ + +/** + * Utils to merge table schema with data schema. + * This is split from SchemaUtils, because finalSchema is introduced into DeltaMergeInto, + * and resolving the final schema is now part of. + */ +object SchemaMergingUtils { + + val DELTA_COL_RESOLVER: (String, String) => Boolean = + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution + + /** + * Returns pairs of (full column name path, field) in this schema as a list. For example, a schema + * like: + * | - a + * | | - 1 + * | | - 2 + * | - b + * | - c + * | | - `foo.bar` + * | | - 3 + * will return [ + * ([a], ), ([a, 1], ), ([a, 2], ), ([b], ), + * ([c], ), ([c, foo.bar], ), ([c, foo.bar, 3], ) + * ] + */ + def explode(schema: StructType): Seq[(Seq[String], StructField)] = { + def recurseIntoComplexTypes(complexType: DataType): Seq[(Seq[String], StructField)] = { + complexType match { + case s: StructType => explode(s) + case a: ArrayType => recurseIntoComplexTypes(a.elementType) + .map { case (path, field) => (Seq("element") ++ path, field) } + case m: MapType => + recurseIntoComplexTypes(m.keyType) + .map { case (path, field) => (Seq("key") ++ path, field) } ++ + recurseIntoComplexTypes(m.valueType) + .map { case (path, field) => (Seq("value") ++ path, field) } + case _ => Nil + } + } + + schema.flatMap { + case f @ StructField(name, s: StructType, _, _) => + Seq((Seq(name), f)) ++ + explode(s).map { case (path, field) => (Seq(name) ++ path, field) } + case f @ StructField(name, a: ArrayType, _, _) => + Seq((Seq(name), f)) ++ + recurseIntoComplexTypes(a).map { case (path, field) => (Seq(name) ++ path, field) } + case f @ StructField(name, m: MapType, _, _) => + Seq((Seq(name), f)) ++ + recurseIntoComplexTypes(m).map { case (path, field) => (Seq(name) ++ path, field) } + case f => (Seq(f.name), f) :: Nil + } + } + + /** + * Returns all column names in this schema as a flat list. For example, a schema like: + * | - a + * | | - 1 + * | | - 2 + * | - b + * | - c + * | | - nest + * | | - 3 + * will get flattened to: "a", "a.1", "a.2", "b", "c", "c.nest", "c.nest.3" + */ + def explodeNestedFieldNames(schema: StructType): Seq[String] = { + explode(schema).map { case (path, _) => path }.map(UnresolvedAttribute.apply(_).name) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingPredicateBuilder.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingPredicateBuilder.scala new file mode 100644 index 00000000000..9cce8da3cdc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingPredicateBuilder.scala @@ -0,0 +1,109 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.Column + +/** + * A trait that defines interfaces for a data skipping predicate builder. + * + * Note that 'IsNull', 'IsNotNull' and 'StartsWith' are handled at a column (not expression) level + * within [[DataSkippingReaderBase.DataFiltersBuilder.constructDataFilters]]. + * + * Note that the 'value' passed in for each of the interface should be [[SkippingEligibleLiteral]]. + */ +private [stats] trait DataSkippingPredicateBuilder { + /** The predicate should match any file which contains the requested point. */ + def equalTo(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] + + /** The predicate should match any file which contains anything other than the rejected point. */ + def notEqualTo(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] + + /** + * The predicate should match any file which contains values less than the requested upper bound. + */ + def lessThan(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] + + /** + * The predicate should match any file which contains values less than or equal to the requested + * upper bound. + */ + def lessThanOrEqual(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] + + /** + * The predicate should match any file which contains values larger than the requested lower + * bound. + */ + def greaterThan(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] + + /** + * The predicate should match any file which contains values larger than or equal to the requested + * lower bound. + */ + def greaterThanOrEqual(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] +} + +/** + * A collection of supported data skipping predicate builders. + */ +object DataSkippingPredicateBuilder { + /** Predicate builder for skipping eligible columns. */ + case object ColumnBuilder extends ColumnPredicateBuilder +} + +/** + * Predicate builder for skipping eligible columns. + */ +private [stats] class ColumnPredicateBuilder + extends DataSkippingPredicateBuilder with UsesMetadataFields { + def equalTo(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = { + statsProvider.getPredicateWithStatTypes(colPath, MIN, MAX) { (min, max) => + min <= value && value <= max + } + } + + def notEqualTo(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = { + statsProvider.getPredicateWithStatTypes(colPath, MIN, MAX) { (min, max) => + min < value || value < max + } + } + + def lessThan(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = + statsProvider.getPredicateWithStatType(colPath, MIN)(_ < value) + + def lessThanOrEqual(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = + statsProvider.getPredicateWithStatType(colPath, MIN)(_ <= value) + + def greaterThan(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = + statsProvider.getPredicateWithStatType(colPath, MAX)(_ > value) + + def greaterThanOrEqual(statsProvider: StatsProvider, colPath: Seq[String], value: Column) + : Option[DataSkippingPredicate] = + statsProvider.getPredicateWithStatType(colPath, MAX)(_ >= value) +} + diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala new file mode 100644 index 00000000000..efc556ce365 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala @@ -0,0 +1,929 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.TypeUtils +import org.apache.spark.sql.delta.actions.{AddFile, Metadata, SingleAction} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeltaDataSkippingType.DeltaDataSkippingType +import org.apache.spark.sql.delta.util.StateCache +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, DeltaTableUtils, NoMapping} +import org.apache.spark.sql.execution.InSubqueryExec +import org.apache.spark.sql.expressions.SparkUserDefinedFunction +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, _} +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +/** + * Represents a stats column (MIN, MAX, etc) for a given (nested) user table column name. Used to + * keep track of which stats columns a data skipping query depends on. + * + * The `statType` is any value accepted by `getStatsColumnOpt()` (see trait `UsesMetadataFields`); + * `pathToColumn` is the nested name of the user column whose stats are to be accessed. + */ +private [stats] case class StatsColumn( + statType: String, + pathToColumn: Seq[String] = Nil) + +/** + * A data skipping predicate, which includes the expression itself, plus the set of stats columns + * that expression depends on. The latter is required to correctly handle missing stats, which would + * make the predicate unreliable; for details, see `DataSkippingReader.verifyStatsForFilter`. + * + * NOTE: It would be more accurate to call these "file keeping" predicates, because they specify the + * set of files a query must examine, not the set of rows a query can safely skip. + */ +private [stats] case class DataSkippingPredicate( + expr: Column, + referencedStats: Set[StatsColumn] +) + +/** + * Overloads the constructor for `DataSkippingPredicate`, allowing callers to pass referenced stats + * as individual arguments, rather than wrapped up as a Set. + * + * For example, instead of this: + * + * DataSkippingPredicate(pred, Set(stat1, stat2)) + * + * We can just do: + * + * DataSkippingPredicate(pred, stat1, stat2) + */ +private [stats] object DataSkippingPredicate { + def apply(filters: Column, referencedStats: StatsColumn*): DataSkippingPredicate = { + DataSkippingPredicate(filters, referencedStats.toSet) + } +} + +/** + * An extractor that matches on access of a skipping-eligible column. We only collect stats for leaf + * columns, so internal columns of nested types are ineligible for skipping. + * + * NOTE: This check is sufficient for safe use of NULL_COUNT stats, but safe use of MIN and MAX + * stats requires additional restrictions on column data type (see SkippingEligibleLiteral). + * + * @return The path to the column, if it exists and is eligible. Otherwise, return None. + */ +object SkippingEligibleColumn { + def unapply(arg: Expression): Option[Seq[String]] = { + // The arg isn't always resolved yet, but if it is we can save time and effort by rejecting + // non-atomic types, since they are never eligible for skipping. Otherwise, (resolved or + // not) the final type checking occurs in getStatsColumnOpt after full resolution. + if (arg.resolved && !arg.dataType.isInstanceOf[AtomicType]) { + None + } else { + searchChain(arg) + } + } + + private def searchChain(arg: Expression): Option[Seq[String]] = arg match { + case a: Attribute => Some(a.name :: Nil) + case GetStructField(child, _, Some(name)) => + searchChain(child).map(name +: _) + case g @ GetStructField(child, ord, None) if g.resolved => + searchChain(child).map(g.childSchema(ord).name +: _) + case _ => + None + } +} + +/** + * An extractor that matches on access of a skipping-eligible Literal. Delta tables track min/max + * stats for a limited set of data types, and only Literals of those types are skipping-eligible. + * + * WARNING: This extractor needs to be kept in sync with StatisticsCollection.statsCollector. + * + * @return The Literal, if it is eligible. Otherwise, return None. + */ +object SkippingEligibleLiteral { + def unapply(arg: Literal): Option[Column] = { + if (isEligibleDataType(arg.dataType)) Some(new Column(arg)) else None + } + + def isEligibleDataType(dt: DataType): Boolean = dt match { + case _: NumericType | DateType | TimestampType | StringType => true + case _ => false + } +} + +private[stats] object DataSkippingReader { + private[this] def col(e: Expression): Column = new Column(e) + def fold(e: Expression): Column = col(new Literal(e.eval(), e.dataType)) + + // Literals often used in the data skipping reader expressions. + val trueLiteral: Column = col(TrueLiteral) + val falseLiteral: Column = col(FalseLiteral) + val nullStringLiteral: Column = col(new Literal(null, StringType)) + val oneMillisecondLiteralExpr: Literal = { + val oneMillisecond = new CalendarInterval(0, 0, 1000 /* micros */) + new Literal(oneMillisecond, CalendarIntervalType) + } + + val sizeCollectorInputEncoders: Seq[Option[ExpressionEncoder[_]]] = Seq( + Option(ExpressionEncoder[Boolean]()), + Option(ExpressionEncoder[java.lang.Long]()), + Option(ExpressionEncoder[java.lang.Long]())) +} + +/** + * Adds the ability to use statistics to filter the set of files based on predicates + * to a [[org.apache.spark.sql.delta.Snapshot]] of a given Delta table. + */ +trait DataSkippingReaderBase + extends DeltaScanGenerator + with StatisticsCollection + with ReadsMetadataFields + with StateCache + with DeltaLogging { + + import DataSkippingReader._ + + def allFiles: Dataset[AddFile] + def path: Path + def version: Long + def metadata: Metadata + def sizeInBytes: Long + def deltaLog: DeltaLog + def schema: StructType + def numOfFiles: Long + def redactedPath: String + + val columnMappingMode = NoMapping + + private def useStats = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING) + + /** Returns a DataFrame expression to obtain a list of files with parsed statistics. */ + private def withStatsInternal0: DataFrame = { + val implicits = spark.implicits + import implicits._ + allFiles.withColumn("stats", from_json($"stats", statsSchema)) + } + + private lazy val withStatsCache = + cacheDS(withStatsInternal0, s"Delta Table State with Stats #$version - $redactedPath") + + protected def withStatsInternal: DataFrame = withStatsCache.getDS + + /** All files with the statistics column dropped completely. */ + def withNoStats: DataFrame = allFiles.drop("stats") + + /** + * Returns a parsed and cached representation of files with statistics. + * + * + * @return cached [[DataFrame]] + */ + final def withStats: DataFrame = { + withStatsInternal + } + + + // Helper method for expression types that represent an IN-list of literal values. + // + // + // For excessively long IN-lists, we just test whether the file's min/max range overlaps the range + // spanned by the list's smallest and largest elements. + private def constructLiteralInListDataFilters(a: Expression, possiblyNullValues: Seq[Any]): + Option[DataSkippingPredicate] = { + val dt = a.dataType + // The Ordering we use for sorting cannot handle null values, and these can anyway + // be safely ignored because they will never cause an IN-list predicate to return TRUE. + val values = possiblyNullValues.filter(_ != null) + lazy val ordering = TypeUtils.getInterpretedOrdering(dt) + if (values.isEmpty) { + // Handle the trivial empty case even for otherwise ineligible types. + // NOTE: SQL forbids empty in-list, but InSubqueryExec could have an empty subquery result + // or IN-list may contain only NULLs. + Some(DataSkippingPredicate(falseLiteral)) + } else if (!SkippingEligibleLiteral.isEligibleDataType(dt)) { + // Don't waste time building expressions for incompatible types + None + } + else { + // Emit filters for an imprecise range test that covers the entire entire list. + val min = Literal(values.min(ordering), dt) + val max = Literal(values.max(ordering), dt) + constructDataFilters(And(GreaterThanOrEqual(max, a), LessThanOrEqual(min, a))) + } + } + + /** + * Returns a file skipping predicate expression, derived from the user query, which uses column + * statistics to prune away files that provably contain no rows the query cares about. + * + * Specifically, the filter extraction code must obey the following rules: + * + * 1. Given a query predicate `e`, `constructDataFilters(e)` must return TRUE for a file unless we + * can prove `e` will not return TRUE for any row the file might contain. For example, given + * `a = 3` and min/max stat values [0, 100], this skipping predicate is safe: + * + * AND(minValues.a <= 3, maxValues.a >= 3) + * + * Because that condition must be true for any file that might possibly contain `a = 3`; the + * skipping predicate could return FALSE only if the max is too low, or the min too high; it + * could return NULL only if a is NULL in every row of the file. In both latter cases, it is + * safe to skip the file because `a = 3` can never evaluate to TRUE. + * + * 2. It is unsafe to apply skipping to operators that can evaluate to NULL or produce an error + * for non-NULL inputs. For example, consider this query predicate involving integer addition: + * + * a + 1 = 3 + * + * It might be tempting to apply the standard equality skipping predicate: + * + * AND(minValues.a + 1 <= 3, 3 <= maxValues.a + 1) + * + * However, the skipping predicate would be unsound, because the addition operator could + * trigger integer overflow (e.g. minValues.a = 0 and maxValues.a = INT_MAX), even though the + * file could very well contain rows satisfying a + 1 = 3. + * + * 3. Predicates involving NOT are ineligible for skipping, because `Not(constructDataFilters(e))` + * is seldom equivalent to `constructDataFilters(Not(e))`. For example, consider the query + * predicate: + * + * NOT(a = 1) + * + * A simple inversion of the data skipping predicate would be: + * + * NOT(AND(minValues.a <= 1, maxValues.a >= 1)) + * ==> OR(NOT(minValues.a <= 1), NOT(maxValues.a >= 1)) + * ==> OR(minValues.a > 1, maxValues.a < 1) + * + * By contrast, if we first combine the NOT with = to obtain + * + * a != 1 + * + * We get a different skipping predicate: + * + * NOT(AND(minValues.a = 1, maxValues.a = 1)) + * ==> OR(NOT(minValues.a = 1), NOT(maxValues.a = 1)) + * ==> OR(minValues.a != 1, maxValues.a != 1) + * + * A truth table confirms that the first (naively inverted) skipping predicate is incorrect: + * + * minValues.a + * | maxValues.a + * | | OR(minValues.a > 1, maxValues.a < 1) + * | | | OR(minValues.a != 1, maxValues.a != 1) + * 0 0 T T + * 0 1 F T !! first predicate wrongly skipped a = 0 + * 1 1 F F + * + * Fortunately, we may be able to eliminate NOT from some (branches of some) predicates: + * + * a. It is safe to push the NOT into the children of AND and OR using de Morgan's Law, e.g. + * + * NOT(AND(a, b)) ==> OR(NOT(a), NOT(B)). + * + * b. It is safe to fold NOT into other operators, when a negated form of the operator exists: + * + * NOT(NOT(x)) ==> x + * NOT(a == b) ==> a != b + * NOT(a > b) ==> a <= b + * + * NOTE: The skipping predicate must handle the case where min and max stats for a column are both + * NULL -- which indicates that all values in the file are NULL. Fortunately, most of the + * operators we support data skipping for are NULL intolerant, and thus trivially satisfy this + * requirement because they never return TRUE for NULL inputs. The only NULL tolerant operator we + * support -- IS [NOT] NULL -- is specifically NULL aware. + * + * NOTE: The skipping predicate does *NOT* need to worry about missing stats columns (which also + * manifest as NULL). That case is handled separately by `verifyStatsForFilter` (which disables + * skipping for any file that lacks the needed stats columns). + */ + private def constructDataFilters(dataFilter: Expression): + Option[DataSkippingPredicate] = dataFilter match { + // Push skipping predicate generation through the AND: + // + // constructDataFilters(AND(a, b)) + // ==> AND(constructDataFilters(a), constructDataFilters(b)) + // + // To see why this transformation is safe, consider that `constructDataFilters(a)` must evaluate + // to TRUE *UNLESS* we can prove that `a` would not evaluate to TRUE for any row the file might + // contain. Thus, if the rewritten form of the skipping predicate does not evaluate to TRUE, at + // least one of the skipping predicates must not have evaluated to TRUE, which in turn means we + // were able to prove that `a` and/or `b` will not evaulate to TRUE for any row of the file. If + // that is the case, then `AND(a, b)` also cannot evaluate to TRUE for any row of the file, + // which proves we have a valid data skipping predicate. + // + // NOTE: AND is special -- we can safely skip the file if one leg does not evaluate to TRUE, + // even if we cannot construct a skipping filter for the other leg. + case And(e1, e2) => + val e1Filter = constructDataFilters(e1) + val e2Filter = constructDataFilters(e2) + if (e1Filter.isDefined && e2Filter.isDefined) { + Some(DataSkippingPredicate( + e1Filter.get.expr && e2Filter.get.expr, + e1Filter.get.referencedStats ++ e2Filter.get.referencedStats)) + } else if (e1Filter.isDefined) { + e1Filter + } else { + e2Filter // possibly None + } + + // Use deMorgan's law to push the NOT past the AND. This is safe even with SQL tri-valued logic + // (see below), and is desirable because we cannot generally push predicate filters through NOT, + // but we *CAN* push predicate filters through AND and OR: + // + // constructDataFilters(NOT(AND(a, b))) + // ==> constructDataFilters(OR(NOT(a), NOT(b))) + // ==> OR(constructDataFilters(NOT(a)), constructDataFilters(NOT(b))) + // + // Assuming we can push the resulting NOT operations all the way down to some leaf operation it + // can fold into, the rewrite allows us to create a data skipping filter from the expression. + // + // a b AND(a, b) + // | | | NOT(AND(a, b)) + // | | | | OR(NOT(a), NOT(b)) + // T T T F F + // T F F T T + // T N N N N + // F F F T T + // F N F T T + // N N N N N + case Not(And(e1, e2)) => + constructDataFilters(Or(Not(e1), Not(e2))) + + // Push skipping predicate generation through OR (similar to AND case). + // + // constructDataFilters(OR(a, b)) + // ==> OR(constructDataFilters(a), constructDataFilters(b)) + // + // Similar to AND case, if the rewritten predicate does not evaluate to TRUE, then it means that + // neither `constructDataFilters(a)` nor `constructDataFilters(b)` evaluated to TRUE, which in + // turn means that neither `a` nor `b` could evaulate to TRUE for any row the file might + // contain, which proves we have a valid data skipping predicate. + // + // Unlike AND, a single leg of an OR expression provides no filtering power -- we can only + // reject a file if both legs evaluate to false. + case Or(e1, e2) => + val e1Filter = constructDataFilters(e1) + val e2Filter = constructDataFilters(e2) + if (e1Filter.isDefined && e2Filter.isDefined) { + Some(DataSkippingPredicate( + e1Filter.get.expr || e2Filter.get.expr, + e1Filter.get.referencedStats ++ e2Filter.get.referencedStats)) + } else { + None + } + + // Similar to AND, we can (and want to) push the NOT past the OR using deMorgan's law. + case Not(Or(e1, e2)) => + constructDataFilters(And(Not(e1), Not(e2))) + + // Match any file whose null count is larger than zero. + case IsNull(SkippingEligibleColumn(a)) => + val nullCountCol = StatsColumn(NULL_COUNT, a) + getStatsColumnOpt(nullCountCol).map { nullCount => + DataSkippingPredicate(nullCount > Literal(0), nullCountCol) + } + case Not(IsNull(e)) => + constructDataFilters(IsNotNull(e)) + + // Match any file whose null count is less than the row count. + case IsNotNull(SkippingEligibleColumn(a)) => + val nullCountCol = StatsColumn(NULL_COUNT, a) + val numRecordsCol = StatsColumn(NUM_RECORDS) + getStatsColumnOpt(nullCountCol).flatMap { nullCount => + getStatsColumnOpt(numRecordsCol).map { numRecords => + DataSkippingPredicate(nullCount < numRecords, nullCountCol, numRecordsCol) + } + } + case Not(IsNotNull(e)) => + constructDataFilters(IsNull(e)) + + // Match any file whose min/max range contains the requested point. + case EqualTo(SkippingEligibleColumn(a), SkippingEligibleLiteral(v)) => + val minCol = StatsColumn(MIN, a) + val maxCol = StatsColumn(MAX, a) + getStatsColumnOpt(minCol).flatMap { min => + getStatsColumnOpt(maxCol).map { max => + DataSkippingPredicate(min <= v && max >= v, minCol, maxCol) + } + } + case EqualTo(v: Literal, a) => + constructDataFilters(EqualTo(a, v)) + + // Match any file whose min/max range contains anything other than the rejected point. + case Not(EqualTo(SkippingEligibleColumn(a), SkippingEligibleLiteral(v))) => + val minCol = StatsColumn(MIN, a) + val maxCol = StatsColumn(MAX, a) + getStatsColumnOpt(minCol).flatMap { min => + getStatsColumnOpt(maxCol).map { max => + DataSkippingPredicate(!(min === v && max === v), minCol, maxCol) + } + } + case Not(EqualTo(v: Literal, a)) => + constructDataFilters(Not(EqualTo(a, v))) + + // Match any file whose min is less than the requested upper bound. + case LessThan(SkippingEligibleColumn(a), SkippingEligibleLiteral(v)) => + val minCol = StatsColumn(MIN, a) + getStatsColumnOpt(minCol).map { min => + DataSkippingPredicate(min < v, minCol) + } + case LessThan(v: Literal, a) => + constructDataFilters(GreaterThan(a, v)) + case Not(LessThan(a, b)) => + constructDataFilters(GreaterThanOrEqual(a, b)) + + // Match any file whose min is less than or equal to the requested upper bound + case LessThanOrEqual(SkippingEligibleColumn(a), SkippingEligibleLiteral(v)) => + val minCol = StatsColumn(MIN, a) + getStatsColumnOpt(minCol).map { min => + DataSkippingPredicate(min <= v, minCol) + } + case LessThanOrEqual(v: Literal, a) => + constructDataFilters(GreaterThanOrEqual(a, v)) + case Not(LessThanOrEqual(a, b)) => + constructDataFilters(GreaterThan(a, b)) + + // Match any file whose max is larger than the requested lower bound. + case GreaterThan(SkippingEligibleColumn(a), SkippingEligibleLiteral(v)) => + val maxCol = StatsColumn(MAX, a) + getStatsColumnOpt(maxCol).map { max => + DataSkippingPredicate(max > v, maxCol) + } + case GreaterThan(v: Literal, a) => + constructDataFilters(LessThan(a, v)) + case Not(GreaterThan(a, b)) => + constructDataFilters(LessThanOrEqual(a, b)) + + // Match any file whose max is larger than or equal to the requested lower bound. + case GreaterThanOrEqual(SkippingEligibleColumn(a), SkippingEligibleLiteral(v)) => + val maxCol = StatsColumn(MAX, a) + getStatsColumnOpt(maxCol).map { max => + DataSkippingPredicate(max >= v, maxCol) + } + case GreaterThanOrEqual(v: Literal, a) => + constructDataFilters(LessThanOrEqual(a, v)) + case Not(GreaterThanOrEqual(a, b)) => + constructDataFilters(LessThan(a, b)) + + // Similar to an equality test, except comparing against a prefix of the min/max stats, and + // neither commutative nor invertible. + case StartsWith(SkippingEligibleColumn(a), v @ Literal(s: UTF8String, StringType)) => + val sLen = s.numChars() + val minCol = StatsColumn(MIN, a) + val maxCol = StatsColumn(MAX, a) + getStatsColumnOpt(minCol).flatMap { min => + getStatsColumnOpt(maxCol).map { max => + DataSkippingPredicate( + substring(min, 0, sLen) <= v && substring(max, 0, sLen) >= v, + minCol, maxCol) + } + } + + // We can only handle-IN lists whose values can all be statically evaluated to literals. + case in @ In(a, values) if in.inSetConvertible => + constructLiteralInListDataFilters(a, values.map(_.asInstanceOf[Literal].value)) + + // The optimizer automatically converts all but the shortest eligible IN-lists to InSet. + case InSet(a, values) => + constructLiteralInListDataFilters(a, values.toSeq) + + // Treat IN(... subquery ...) as a normal IN-list, since the subquery already ran before now. + case in: InSubqueryExec => + // At this point the subquery has been materialized so it is safe to call get on the Option. + constructLiteralInListDataFilters(in.child, in.values().get.toSeq) + + // Remove redundant pairs of NOT + case Not(Not(e)) => + constructDataFilters(e) + + // WARNING: NOT is dangerous, because `Not(constructDataFilters(e))` is seldom equivalent to + // `constructDataFilters(Not(e))`. We must special-case every `Not(e)` we wish to support. + case Not(_) => None + + // Unknown expression type... can't use it for data skipping. + case _ => None + } + + /** + * Returns an expression to access the given statistics for a specific column, or None if that + * stats column does not exist. + * + * @param statType One of the fields declared by trait `UsesMetadataFields` + * @param pathToColumn The components of the nested column name to get stats for. + */ + final protected def getStatsColumnOpt(statType: String, pathToColumn: Seq[String] = Nil) + : Option[Column] = { + import org.apache.spark.sql.delta.implicits._ + + // If the requested stats type doesn't even exist, just return None right away. This can + // legitimately happen if we have no stats at all, or if column stats are disabled (in which + // case only the NUM_RECORDS stat type is available). + if (!statsSchema.exists(_.name == statType)) { + return None + } + + // Given a set of path segments in reverse order, e.g. column a.b.c is Seq("c", "b", "a"), we + // use a foldRight operation to build up the requested stats column, by successively applying + // each new path step against both the table schema and the stats schema. We can't use the stats + // schema alone, because the caller-provided path segments use logical column names, while the + // stats schema requires physical column names. Instead, we must step into the table schema to + // extract that field's physical column name, and use the result to step into the stats schema. + // + // We use a three-tuple to track state. The traversal starts with the base column for the + // requested stat type, the stats schema for the requested stat type, and the table schema. Each + // step of the traversal emits the updated column, along with the stats schema and table schema + // elements corresponding to that column. + val initialState: Option[(Column, DataType, DataType)] = + Some((getBaseStatsColumn.getField(statType), statsSchema(statType).dataType, metadata.schema)) + pathToColumn + .foldRight(initialState) { + // NOTE: Only match on StructType, because we cannot traverse through other DataTypes. + case (fieldName, Some((statCol, statsSchema: StructType, tableSchema: StructType))) => + // First try to step into the table schema + val tableFieldOpt = tableSchema.findNestedFieldIgnoreCase(Seq(fieldName)) + + // If that worked, try to step into the stats schema, using its its physical name + val statsFieldOpt = tableFieldOpt + .map(DeltaColumnMapping.getPhysicalName) + .filter(physicalFieldName => statsSchema.exists(_.name == physicalFieldName)) + .map(statsSchema(_)) + + // If all that succeeds, return the new stats column and the corresponding data types. + statsFieldOpt.map(statsField => + (statCol.getField(statsField.name), statsField.dataType, tableFieldOpt.get.dataType)) + + // Propagate failure if the above match failed (or if already None) + case _ => None + } + // Filter out non-leaf columns -- they lack stats so skipping predicates can't use them. + .filterNot(_._2.isInstanceOf[StructType]) + .map { + case (statCol, TimestampType, _) if statType == MAX => + // SC-22824: For timestamps, JSON serialization will truncate to milliseconds. This means + // that we must adjust 1 millisecond upwards for max stats, or we will incorrectly skip + // records that differ only in microsecond precision. (For example, a file containing only + // 01:02:03.456789 will be written with min == max == 01:02:03.456, so we must consider it + // to contain the range from 01:02:03.456 to 01:02:03.457.) + // + // There is a longer term task SC-22825 to fix the serialization problem that caused this. + // But we need the adjustment in any case to correctly read stats written by old versions. + new Column(Cast(TimeAdd(statCol.expr, oneMillisecondLiteralExpr), TimestampType)) + case (statCol, _, _) => + statCol + } + } + + /** + * Returns an expression to access the given statistics for a specific column, or a NULL + * literal expression if that column does not exist. + */ + final protected def getStatsColumnOrNullLiteral(statType: String, pathToColumn: Seq[String] = Nil) + : Column = + getStatsColumnOpt(statType, pathToColumn).getOrElse(lit(null)) + + /** Overload for convenience working with StatsColumn helpers */ + final protected def getStatsColumnOpt(stat: StatsColumn): Option[Column] = + getStatsColumnOpt(stat.statType, stat.pathToColumn) + + /** Overload for convenience working with StatsColumn helpers */ + final protected def getStatsColumnOrNullLiteral(stat: StatsColumn): Column = + getStatsColumnOrNullLiteral(stat.statType, stat.pathToColumn) + + /** + * Returns an expression that can be used to check that the required statistics are present for a + * given file. If any required statistics are missing we must include the corresponding file. + * + * NOTE: We intentionally choose to disable skipping for any file if any required stat is missing, + * because doing it that way allows us to check each stat only once (rather than once per + * use). Checking per-use would anyway only help for tables where the number of indexed columns + * has changed over time, producing add.stats_parsed records with differing schemas. That should + * be a rare enough case to not worry about optimizing for, given that the fix requires more + * complex skipping predicates that would penalize the common case. + */ + protected def verifyStatsForFilter(referencedStats: Set[StatsColumn]): Column = { + recordFrameProfile("Delta", "DataSkippingReader.verifyStatsForFilter") { + // The NULL checks for MIN and MAX stats depend on NULL_COUNT and NUM_RECORDS. Derive those + // implied dependencies first, so the main pass can treat them like any other column. + // + // NOTE: We must include explicit NULL checks on all stats columns we access here, because our + // caller will negate the expression we return. In case a stats column is NULL, `NOT(expr)` + // must return `TRUE`, and without these NULL checks it would instead return + // `NOT(NULL)` => `NULL`. + referencedStats.flatMap { stat => stat match { + case StatsColumn(MIN, _) | StatsColumn(MAX, _) => + Seq(stat, StatsColumn(NULL_COUNT, stat.pathToColumn), StatsColumn(NUM_RECORDS)) + case _ => + Seq(stat) + }}.map{stat => stat match { + case StatsColumn(MIN, _) | StatsColumn(MAX, _) => + // A usable MIN or MAX stat must be non-NULL, unless the column is provably all-NULL + // + // NOTE: We don't care about NULL/missing NULL_COUNT and NUM_RECORDS here, because the + // separate NULL checks we emit for those columns will force the overall validation + // predicate conjunction to FALSE in that case -- AND(FALSE, ) is FALSE. + (getStatsColumnOrNullLiteral(stat).isNotNull || + (getStatsColumnOrNullLiteral(NULL_COUNT, stat.pathToColumn) === + getStatsColumnOrNullLiteral(NUM_RECORDS))) + case _ => + // Other stats, such as NULL_COUNT and NUM_RECORDS stat, merely need to be non-NULL + getStatsColumnOrNullLiteral(stat).isNotNull + }} + .reduceLeftOption(_.and(_)) + .getOrElse(trueLiteral) + } + } + + private def buildSizeCollectorFilter(): (ArrayAccumulator, Column => Column) = { + val implicits = spark.implicits + import implicits._ + val bytesCompressed = $"size" + val rows = getStatsColumnOrNullLiteral(NUM_RECORDS) + + val accumulator = new ArrayAccumulator(3) + spark.sparkContext.register(accumulator) + + // The arguments (order and datatype) must match the encoders defined in the + // `sizeCollectorInputEncoders` value. + val collector = (include: Boolean, bytesCompressed: java.lang.Long, rows: java.lang.Long) => { + if (include) { + accumulator.add((0, bytesCompressed)) /* count bytes of AddFiles */ + accumulator.add((1, Option(rows).map(_.toLong).getOrElse(-1L))) /* count rows in AddFiles */ + accumulator.add((2, 1)) /* count number of AddFiles */ + } + include + } + val collectorUdf = SparkUserDefinedFunction( + f = collector, + dataType = BooleanType, + inputEncoders = sizeCollectorInputEncoders, + deterministic = false) + + (accumulator, collectorUdf(_: Column, bytesCompressed, rows)) + } + + override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = { + DeltaLog.filterFileList(metadata.partitionSchema, withStats, partitionFilters) + } + + /** + * Get all the files in this table. + * + * @param keepNumRecords Also select `stats.numRecords` in the query. + * This may slow down the query as it has to parse json. + */ + protected def getAllFiles(keepNumRecords: Boolean): Seq[AddFile] = withDmqTag { + recordFrameProfile("Delta", "DataSkippingReader.getAllFiles") { + val implicits = spark.implicits + import implicits._ + + if (keepNumRecords) { + withStats // use withStats instead of allFiles so the `stats` column is already parsed + // keep only the numRecords field as a Json string in the stats field + .withColumn("stats", to_json(struct($"stats.numRecords" as 'numRecords))) + .as(SingleAction.addFileEncoder) + .collect().toSeq + } else { + allFiles.withColumn("stats", nullStringLiteral).as(SingleAction.addFileEncoder) + .collect().toSeq + } + } + } + + /** + * Given the partition filters on the data, rewrite these filters by pointing to the metadata + * columns. + */ + protected def constructPartitionFilters(filters: Seq[Expression]): Column = { + recordFrameProfile("Delta", "DataSkippingReader.constructPartitionFilters") { + val rewritten = DeltaLog.rewritePartitionFilters( + metadata.partitionSchema, spark.sessionState.conf.resolver, filters) + rewritten.reduceOption(And).map { expr => new Column(expr) }.getOrElse(trueLiteral) + } + } + + /** + * Get all the files in this table given the partition filter and the corresponding size of + * the scan. + * + * @param keepNumRecords Also select `stats.numRecords` in the query. + * This may slow down the query as it has to parse json. + */ + protected def filterOnPartitions( + partitionFilters: Seq[Expression], + keepNumRecords: Boolean): (Seq[AddFile], DataSize) = withDmqTag { + recordFrameProfile("Delta", "DataSkippingReader.filterOnPartitions") { + val implicits = spark.implicits + import implicits._ + + val files = + if (keepNumRecords) { + // use withStats instead of allFiles so the `stats` column is already parsed + val filteredFiles = + DeltaLog.filterFileList(metadata.partitionSchema, withStats, partitionFilters) + filteredFiles + // keep only the numRecords field as a Json string in the stats field + .withColumn("stats", to_json(struct($"stats.numRecords" as 'numRecords))) + .as(SingleAction.addFileEncoder) + .collect() + } else { + val filteredFiles = + DeltaLog.filterFileList(metadata.partitionSchema, allFiles.toDF(), partitionFilters) + filteredFiles + .withColumn("stats", nullStringLiteral) + .as(SingleAction.addFileEncoder) + .collect() + } + + + val sizeInBytesByPartitionFilters = files.map(_.size).sum + + files.toSeq -> DataSize(Some(sizeInBytesByPartitionFilters), None, Some(files.size)) + } + } + + /** + * Given the partition and data filters, leverage data skipping statistics to find the set of + * files that need to be queried. Returns a tuple of the files and optionally the size of the + * scan that's generated if there were no filters, if there were only partition filters, and + * combined effect of partition and data filters respectively. + */ + protected def getDataSkippedFiles( + partitionFilters: Column, + dataFilters: DataSkippingPredicate, + keepNumRecords: Boolean): (Seq[AddFile], Seq[DataSize]) = withDmqTag { + recordFrameProfile("Delta", "DataSkippingReader.getDataSkippedFiles") { + val implicits = spark.implicits + import implicits._ + + val (totalSize, totalFilter) = buildSizeCollectorFilter() + val (partitionSize, partitionFilter) = buildSizeCollectorFilter() + val (scanSize, scanFilter) = buildSizeCollectorFilter() + + // NOTE: If any stats are missing, the value of `dataFilters` is untrustworthy -- it could be + // NULL or even just plain incorrect. We rely on `verifyStatsForFilter` to be FALSE in that + // case, forcing the overall OR to evaluate as TRUE no matter what value `dataFilters` takes. + val filteredFiles = withStats + .where(totalFilter(trueLiteral)) + .where(partitionFilter(partitionFilters)) + .where(scanFilter(dataFilters.expr || !verifyStatsForFilter(dataFilters.referencedStats))) + + val statsColumn = if (keepNumRecords) { + // keep only the numRecords field as a Json string in the stats field + to_json(struct($"stats.numRecords" as 'numRecords)) + } else nullStringLiteral + + val files = + recordFrameProfile("Delta", "DataSkippingReader.getDataSkippedFiles.collectFiles") { + filteredFiles.withColumn("stats", statsColumn).as(SingleAction.addFileEncoder).collect() + } + + files.toSeq -> Seq(DataSize(totalSize), DataSize(partitionSize), DataSize(scanSize)) + } + } + + private def getCorrectDataSkippingType( + dataSkippingType: DeltaDataSkippingType): DeltaDataSkippingType = { + dataSkippingType + } + + /** + * Gathers files that should be included in a scan based on the given predicates. + * Statistics about the amount of data that will be read are gathered and returned. + */ + override def filesForScan( + projection: Seq[Attribute], + filters: Seq[Expression]): DeltaScan = + filesForScan(projection, filters, keepNumRecords = false) + + def filesForScan( + projection: Seq[Attribute], + filters: Seq[Expression], + keepNumRecords: Boolean): DeltaScan = { + val startTime = System.currentTimeMillis() + if (filters == Seq(TrueLiteral) || filters.isEmpty || schema.isEmpty) { + recordDeltaOperation(deltaLog, "delta.skipping.none") { + // When there are no filters we can just return allFiles with no extra processing + val dataSize = DataSize( + bytesCompressed = Some(sizeInBytes), + rows = None, + files = Some(numOfFiles)) + return DeltaScan( + version = version, + files = getAllFiles(keepNumRecords), + total = dataSize, + partition = dataSize, + scanned = dataSize)( + projection = AttributeSet(projection), + partitionFilters = ExpressionSet(Nil), + dataFilters = ExpressionSet(Nil), + unusedFilters = ExpressionSet(Nil), + scanDurationMs = System.currentTimeMillis() - startTime, + dataSkippingType = getCorrectDataSkippingType(DeltaDataSkippingType.noSkippingV1) + ) + } + } + + import DeltaTableUtils._ + val partitionColumns = metadata.partitionColumns + + // for data skipping, avoid using the filters that involve subqueries + val (subqueryFilters, flatFilters) = filters.partition(containsSubquery(_)) + + val (partitionFilters, dataFilters) = flatFilters + .partition(isPredicatePartitionColumnsOnly(_, partitionColumns, spark)) + + if (dataFilters.isEmpty) recordDeltaOperation(deltaLog, "delta.skipping.partition") { + // When there are only partition filters we can scan allFiles + // rather than withStats and thus we skip data skipping information. + val (files, scanSize) = filterOnPartitions(partitionFilters, keepNumRecords) + DeltaScan( + version = version, + files = files, + total = DataSize(Some(sizeInBytes), None, Some(numOfFiles)), + partition = scanSize, + scanned = scanSize)( + projection = AttributeSet(projection), + partitionFilters = ExpressionSet(partitionFilters), + dataFilters = ExpressionSet(Nil), + unusedFilters = ExpressionSet(subqueryFilters), + scanDurationMs = System.currentTimeMillis() - startTime, + dataSkippingType = + getCorrectDataSkippingType(DeltaDataSkippingType.partitionFilteringOnlyV1) + ) + } else recordDeltaOperation(deltaLog, "delta.skipping.data") { + val finalPartitionFilters = constructPartitionFilters(partitionFilters) + + val (skippingFilters, unusedFilters) = if (useStats) { + dataFilters.map(f => (f, constructDataFilters(f))).partition(f => f._2.isDefined) + } else { + (Nil, dataFilters.map(f => (f, None))) + } + + val finalSkippingFilters = skippingFilters + .map(_._2.get) + .reduceOption((skip1, skip2) => DataSkippingPredicate( + // Fold the filters into a conjunction, while unioning their referencedStats. + skip1.expr && skip2.expr, skip1.referencedStats ++ skip2.referencedStats)) + .getOrElse((DataSkippingPredicate(trueLiteral))) + + val (files, sizes) = { + getDataSkippedFiles(finalPartitionFilters, finalSkippingFilters, keepNumRecords) + } + + val dataSkippingType = if (partitionFilters.isEmpty) { + DeltaDataSkippingType.dataSkippingOnlyV1 + } else { + DeltaDataSkippingType.dataSkippingAndPartitionFilteringV1 + } + + DeltaScan( + version = version, + files = files, + total = sizes(0), + partition = sizes(1), + scanned = sizes(2))( + projection = AttributeSet(projection), + partitionFilters = ExpressionSet(partitionFilters), + dataFilters = ExpressionSet(skippingFilters.map(_._1)), + unusedFilters = ExpressionSet(unusedFilters.map(_._1) ++ subqueryFilters), + scanDurationMs = System.currentTimeMillis() - startTime, + dataSkippingType = getCorrectDataSkippingType(dataSkippingType) + ) + } + } + + /** + * Get AddFile (with stats) actions corresponding to given set of paths in the Snapshot. + * If a path doesn't exist in snapshot, it will be ignored and no [[AddFile]] will be returned + * for it. + * @param paths Sequence of paths for which we want to get [[AddFile]] action + * @return a sequence of addFiles for the given `paths` + */ + def getSpecificFilesWithStats(paths: Seq[String]): Seq[AddFile] = { + withDmqTag { + val implicits = spark.implicits + import implicits._ + val right = paths.toDF("path") + allFiles.join(right, Seq("path"), "leftsemi").as(SingleAction.addFileEncoder).collect() + } + } +} + +trait DataSkippingReader extends DataSkippingReaderBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala index 0b818b732b2..164956b9463 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScan.scala @@ -16,46 +16,70 @@ package org.apache.spark.sql.delta.stats -import org.apache.spark.sql.delta.actions.AddFile import com.fasterxml.jackson.databind.annotation.JsonDeserialize - import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.stats.DeltaDataSkippingType.DeltaDataSkippingType /** + * DataSize describes following attributes for data that consists of a list of input files + * @param bytesCompressed total size of the data + * @param rows number of rows in the data + * @param files number of input files * Note: Please don't add any new constructor to this class. `jackson-module-scala` always picks up * the first constructor returned by `Class.getConstructors` but the order of the constructors list * is non-deterministic. (SC-13343) */ case class DataSize( - @JsonDeserialize(contentAs = classOf[java.lang.Long]) - bytesCompressed: Option[Long] = None, - @JsonDeserialize(contentAs = classOf[java.lang.Long]) - rows: Option[Long] = None) + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + bytesCompressed: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + rows: Option[Long] = None, + @JsonDeserialize(contentAs = classOf[java.lang.Long]) + files: Option[Long] = None +) object DataSize { def apply(a: ArrayAccumulator): DataSize = { DataSize( Option(a.value(0)).filterNot(_ == -1), - Option(a.value(1)).filterNot(_ == -1)) + Option(a.value(1)).filterNot(_ == -1), + Option(a.value(2)).filterNot(_ == -1) + ) } } +object DeltaDataSkippingType extends Enumeration { + type DeltaDataSkippingType = Value + // V1: code path in DataSkippingReader.scala, which needs StateReconstruction + // noSkipping: no skipping and get all files from the Delta table + // partitionFiltering: filtering and skipping based on partition columns + // dataSkipping: filtering and skipping based on stats columns + // limit: skipping based on limit clause in DataSkippingReader.scala + // filteredLimit: skipping based on limit clause and partition columns in DataSkippingReader.scala + val noSkippingV1, noSkippingV2, partitionFilteringOnlyV1, partitionFilteringOnlyV2, + dataSkippingOnlyV1, dataSkippingOnlyV2, dataSkippingAndPartitionFilteringV1, + dataSkippingAndPartitionFilteringV2, limit, filteredLimit = Value +} + /** * Used to hold details the files and stats for a scan where we have already * applied filters and a limit. */ case class DeltaScan( - version: Long, - files: Seq[AddFile], - total: DataSize, - partition: DataSize, - scanned: DataSize)( - // Moved to separate argument list, to not be part of case class equals check - - // expressions can differ by exprId or ordering, but as long as same files are scanned, the - // PreparedDeltaFileIndex and HadoopFsRelation should be considered equal for reuse purposes. - val partitionFilters: ExpressionSet, - val dataFilters: ExpressionSet, - val unusedFilters: ExpressionSet, - val projection: AttributeSet) { + version: Long, + files: Seq[AddFile], + total: DataSize, + partition: DataSize, + scanned: DataSize)( + // Moved to separate argument list, to not be part of case class equals check - + // expressions can differ by exprId or ordering, but as long as same files are scanned, the + // PreparedDeltaFileIndex and HadoopFsRelation should be considered equal for reuse purposes. + val partitionFilters: ExpressionSet, + val dataFilters: ExpressionSet, + val unusedFilters: ExpressionSet, + val projection: AttributeSet, + val scanDurationMs: Long, + val dataSkippingType: DeltaDataSkippingType) { def allFilters: ExpressionSet = partitionFilters ++ dataFilters ++ unusedFilters } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScanGenerator.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScanGenerator.scala new file mode 100644 index 00000000000..f7985025e31 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/DeltaScanGenerator.scala @@ -0,0 +1,39 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.delta.Snapshot + +/** Trait representing a class that can generate [[DeltaScan]] given filters, etc. */ +trait DeltaScanGeneratorBase { + /** The snapshot that the scan is being generated on. */ + val snapshotToScan: Snapshot + + /** + * Returns a DataFrame for the given partition filters. The schema of returned DataFrame is nearly + * the same as `AddFile`, except that the `stats` field is parsed to a struct from a json string. + */ + def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame + + /** Returns a [[DeltaScan]] based on the given filters. */ + def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan +} + + +trait DeltaScanGenerator extends DeltaScanGeneratorBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala new file mode 100644 index 00000000000..c55d12c5b3a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala @@ -0,0 +1,325 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import java.util.Objects + +import scala.collection.mutable + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types.StructType + +/** + * Before query planning, we prepare any scans over delta tables by pushing + * any projections or filters in allowing us to gather more accurate statistics + * for CBO and metering. + * + * Note the following + * - This rule also ensures that all reads from the same delta log use the same snapshot of log + * thus providing snapshot isolation. + * - If this rule is invoked within an active [[OptimisticTransaction]], then the scans are + * generated using the transaction. + */ +trait PrepareDeltaScanBase extends Rule[LogicalPlan] + with PredicateHelper + with DeltaLogging { self: PrepareDeltaScan => + + private val snapshotIsolationEnabled = spark.conf.get(DeltaSQLConf.DELTA_SNAPSHOT_ISOLATION) + + /** + * Tracks the first-access snapshots of other logs planned by this rule. The snapshots are + * the keyed by the log's unique id. Note that the lifetime of this rule is a single + * query, therefore, the map tracks the snapshots only within a query. + */ + private val scannedSnapshots = + new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot] + + /** + * Gets the [[DeltaScanGenerator]] for the given log, which will be used to generate + * [[DeltaScan]]s. Every time this method is called on a log within the lifetime of this + * rule (i.e., the lifetime of the query for which this rule was instantiated), the returned + * generator will read a snapshot that is pinned on the first access for that log. + * + * Internally, it will use the snapshot of the file index, the snapshot of the active transaction + * (if any), or the latest snapshot of the given log. + */ + protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = { + // The first case means that we've fixed the table snapshot for time travel + if (index.isTimeTravelQuery) return index.getSnapshot + val scanGenerator = OptimisticTransaction.getActive().map(_.getDeltaScanGenerator(index)) + .getOrElse { + val snapshot = if (snapshotIsolationEnabled) { + scannedSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => { + // Will be called only when the log is accessed the first time + index.getSnapshot + }) + } else { + index.getSnapshot + } + snapshot + } + import PrepareDeltaScanBase._ + if (onGetDeltaScanGeneratorCallback != null) onGetDeltaScanGeneratorCallback(scanGenerator) + scanGenerator + } + + /** + * Helper method to generate a [[PreparedDeltaFileIndex]] + */ + protected def getPreparedIndex( + preparedScan: DeltaScan, + fileIndex: TahoeLogFileIndex): PreparedDeltaFileIndex = { + assert(fileIndex.partitionFilters.isEmpty, + "Partition filters should have been extracted by DeltaAnalysis.") + PreparedDeltaFileIndex( + spark, + fileIndex.deltaLog, + fileIndex.path, + preparedScan, + fileIndex.partitionSchema, + fileIndex.versionToUse) + } + + /** + * Scan files using the given `filters` and return the snapshot object used to + * scan files and `DeltaScan`. + */ + protected def filesForScan( + scanGenerator: DeltaScanGenerator, + limitOpt: Option[Int], + projection: Seq[Attribute], + filters: Seq[Expression], + delta: LogicalRelation): (Snapshot, DeltaScan) = { + withStatusCode("DELTA", "Filtering files for query") { + scanGenerator.snapshotToScan -> scanGenerator.filesForScan(projection, filters) + } + } + + /** + * Prepares delta scans sequentially. + */ + protected def prepareDeltaScan(plan: LogicalPlan): LogicalPlan = { + // A map from the canonicalized form of a DeltaTableScan operator to its corresponding delta + // scan and the snapshot we use to scan the table. This map is used to avoid fetching duplicate + // delta indexes for structurally-equal delta scans. + val deltaScans = new mutable.HashMap[LogicalPlan, (Snapshot, DeltaScan)]() + + def transform(plan: LogicalPlan): LogicalPlan = + plan transform { + case scan @ DeltaTableScan(projection, filters, fileIndex, limit, delta) => + val scanGenerator = getDeltaScanGenerator(fileIndex) + val (scannedSnapshot, preparedScan) = deltaScans.getOrElseUpdate(scan.canonicalized, + filesForScan(scanGenerator, limit, projection, filters, delta)) + val preparedIndex = getPreparedIndex(preparedScan, fileIndex) + optimizeGeneratedColumns( + scannedSnapshot, scan, preparedIndex, filters, limit, delta) + } + + transform(plan) + } + + protected def optimizeGeneratedColumns( + scannedSnapshot: Snapshot, + scan: LogicalPlan, + preparedIndex: PreparedDeltaFileIndex, + filters: Seq[Expression], + limit: Option[Int], + delta: LogicalRelation): LogicalPlan = { + DeltaTableUtils.replaceFileIndex(scan, preparedIndex) + } + + override def apply(_plan: LogicalPlan): LogicalPlan = { + var plan = _plan + if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)) { + + // Should not be applied to subqueries to avoid duplicate delta jobs. + val isSubquery = plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery] + // Should not be applied to DataSourceV2 write plans, because they'll be planned later + // through a V1 fallback and only that later planning takes place within the transaction. + val isDataSourceV2 = plan.isInstanceOf[V2WriteCommand] + if (isSubquery || isDataSourceV2) { + return plan + } + + prepareDeltaScan(plan) + } else { + // If this query is running inside an active transaction and is touching the same table + // as the transaction, then mark that the entire table as tainted to be safe. + OptimisticTransaction.getActive.foreach { txn => + val logsInPlan = plan.collect { case DeltaTable(fileIndex) => fileIndex.deltaLog } + if (logsInPlan.exists(_.isSameLogAs(txn.deltaLog))) { + txn.readWholeTable() + } + } + + // Just return the plan if statistics based skipping is off. + // It will fall back to just partition pruning at planning time. + plan + } + } + + /** + * This is an extractor object. See https://docs.scala-lang.org/tour/extractor-objects.html. + */ + object DeltaTableScan { + + /** + * The components of DeltaTableScanType are: + * - an `AttributeSet` of the project collected by `PhysicalOperation` + * - filter expressions collected by `PhysicalOperation` + * - the `TahoeLogFileIndex` of the matched DeltaTable` + * - integer value of limit expression, if any + * - matched `DeltaTable` + */ + private type DeltaTableScanType = + (Seq[Attribute], Seq[Expression], TahoeLogFileIndex, Option[Int], LogicalRelation) + + /** + * This is an extractor method (basically, the opposite of a constructor) which takes in an + * object `plan` and tries to give back the arguments as a [[DeltaTableScanType]]. + */ + def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = { + plan match { + case PhysicalOperation( + project, + filters, + delta @ DeltaTable(fileIndex: TahoeLogFileIndex)) => + val projects = AttributeSet(project).toSeq + val allFilters = fileIndex.partitionFilters ++ filters + Some((projects, allFilters, fileIndex, None, delta)) + + case _ => None + } + } + + private def containsPartitionFiltersOnly( + filters: Seq[Expression], + fileIndex: TahoeLogFileIndex): Boolean = { + val partitionColumns = fileIndex.deltaLog.snapshot.metadata.partitionColumns + import DeltaTableUtils._ + filters.forall(expr => !containsSubquery(expr) && + isPredicatePartitionColumnsOnly(expr, partitionColumns, spark)) + } + } +} + +class PrepareDeltaScan(protected val spark: SparkSession) + extends PrepareDeltaScanBase + +object PrepareDeltaScanBase { + + /** + * Optional callback function that is called after `getDeltaScanGenerator` is called + * by the PrepareDeltaScan rule. This is primarily used for testing purposes. + */ + @volatile private var onGetDeltaScanGeneratorCallback: DeltaScanGenerator => Unit = _ + + /** + * Run a thunk of code with the given callback function injected into the PrepareDeltaScan rule. + * The callback function is called after `getDeltaScanGenerator` is called + * by the PrepareDeltaScan rule. This is primarily used for testing purposes. + */ + private[delta] def withCallbackOnGetDeltaScanGenerator[T]( + callback: DeltaScanGenerator => Unit)(thunk: => T): T = { + try { + onGetDeltaScanGeneratorCallback = callback + thunk + } finally { + onGetDeltaScanGeneratorCallback = null + } + } +} + +/** + * A [[TahoeFileIndex]] that uses a prepared scan to return the list of relevant files. + * This is injected into a query right before query planning by [[PrepareDeltaScan]] so that + * CBO and metering can accurately understand how much data will be read. + * + * @param versionScanned The version of the table that is being scanned, if a specific version + * has specifically been requested, e.g. by time travel. + */ +case class PreparedDeltaFileIndex( + override val spark: SparkSession, + override val deltaLog: DeltaLog, + override val path: Path, + preparedScan: DeltaScan, + override val partitionSchema: StructType, + versionScanned: Option[Long]) + extends TahoeFileIndex(spark, deltaLog, path) with DeltaLogging { + + override def tableVersion: Long = preparedScan.version + + /** + * Returns all matching/valid files by the given `partitionFilters` and `dataFilters` + */ + override def matchingFiles( + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[AddFile] = { + val actualFilters = ExpressionSet(partitionFilters ++ dataFilters) + if (preparedScan.allFilters == actualFilters) { + preparedScan.files.distinct + } else { + logInfo( + s""" + |Prepared scan does not match actual filters. Reselecting files to query. + |Prepared: ${preparedScan.allFilters} + |Actual: ${actualFilters} + """.stripMargin) + deltaLog.getSnapshotAt(preparedScan.version).filesForScan( + projection = Nil, partitionFilters ++ dataFilters).files + } + } + + /** + * Returns the list of files that will be read when scanning this relation. This call may be + * very expensive for large tables. + */ + override def inputFiles: Array[String] = + preparedScan.files.map(f => absolutePath(f.path).toString).toArray + + /** Refresh any cached file listings */ + override def refresh(): Unit = { } + + /** Sum of table file sizes, in bytes */ + override def sizeInBytes: Long = + preparedScan.scanned.bytesCompressed + .getOrElse(spark.sessionState.conf.defaultSizeInBytes) + + override def equals(other: Any): Boolean = other match { + case p: PreparedDeltaFileIndex => + p.deltaLog == deltaLog && p.path == path && p.preparedScan == preparedScan && + p.partitionSchema == partitionSchema && p.versionScanned == versionScanned + case _ => false + } + + override def hashCode(): Int = { + Objects.hash(deltaLog, path, preparedScan, partitionSchema, versionScanned) + } + +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala new file mode 100644 index 00000000000..e39310e669c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/StatisticsCollection.scala @@ -0,0 +1,298 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +// scalastyle:off import.ordering.noEmptyLine +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.delta.DeltaOperations.ComputeStats +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.commands.DeltaCommand +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, DeltaUDF} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ + +import scala.collection.mutable.ArrayBuffer + +/** + * Used to report metrics on how predicates are used to prune the set of + * files that are read by a query. + * + * @param predicate A user readable version of the predicate. + * @param pruningType One of {partition, dataStats, none}. + * @param filesMissingStats The number of files that were included due to missing statistics. + * @param filesDropped The number of files that were dropped by this predicate. + */ +case class QueryPredicateReport( + predicate: String, + pruningType: String, + filesMissingStats: Long, + filesDropped: Long) + +/** Used to report details about prequery filtering of what data is scanned. */ +case class FilterMetric(numFiles: Long, predicates: Seq[QueryPredicateReport]) + +/** + * A helper trait that constructs expressions that can be used to collect global + * and column level statistics for a collection of data, given its schema. + * + * Global statistics (such as the number of records) are stored as top level columns. + * Per-column statistics (such as min/max) are stored in a struct that mirrors the + * schema of the data. + * + * To illustrate, here is an example of a data schema along with the schema of the statistics + * that would be collected. + * + * Data Schema: + * {{{ + * |-- a: struct (nullable = true) + * | |-- b: struct (nullable = true) + * | | |-- c: long (nullable = true) + * }}} + * + * Collected Statistics: + * {{{ + * |-- stats: struct (nullable = true) + * | |-- numRecords: long (nullable = false) + * | |-- minValues: struct (nullable = false) + * | | |-- a: struct (nullable = false) + * | | | |-- b: struct (nullable = false) + * | | | | |-- c: long (nullable = true) + * | |-- maxValues: struct (nullable = false) + * | | |-- a: struct (nullable = false) + * | | | |-- b: struct (nullable = false) + * | | | | |-- c: long (nullable = true) + * | |-- nullCount: struct (nullable = false) + * | | |-- a: struct (nullable = false) + * | | | |-- b: struct (nullable = false) + * | | | | |-- c: long (nullable = true) + * }}} + */ +trait StatisticsCollection extends UsesMetadataFields with DeltaLogging { + protected def spark: SparkSession + def dataSchema: StructType + val numIndexedCols: Int + + /** + * statCollectionSchema is the schema that is composed of all the columns that have the stats + * collected with our current table configuration. + */ + lazy val statCollectionSchema: StructType = { + if (numIndexedCols >= 0) { + truncateSchema(dataSchema, numIndexedCols)._1 + } else { + dataSchema + } + } + + /** + * Returns a struct column that can be used to collect statistics for the current + * schema of the table. + * The types we keep stats on must be consistent with DataSkippingReader.SkippingEligibleLiteral. + */ + lazy val statsCollector: Column = { + val stringPrefix = 32 + + struct( + count(new Column("*")) as NUM_RECORDS, + collectStats(MIN, statCollectionSchema) { + // Truncate string min values as necessary + case (c, f) if f.dataType == StringType => + substring(min(c), 0, stringPrefix) + + // Collect all numeric min values + case (c, f) if f.dataType.isInstanceOf[NumericType] || + f.dataType == DateType || + f.dataType == TimestampType => + min(c) + }, + collectStats(MAX, statCollectionSchema) { + // Truncate and pad string max values as necessary + case (c, f) if f.dataType == StringType => + val udfTruncateMax = + DeltaUDF.stringStringUdf(StatisticsCollection.truncateMaxStringAgg(stringPrefix)_) + udfTruncateMax(max(c)) + + // Collect all numeric max values + case (c, f) if f.dataType.isInstanceOf[NumericType] || + f.dataType == DateType || + f.dataType == TimestampType => + max(c) + }, + collectStats(NULL_COUNT, statCollectionSchema) { + case (c, f) => sum(when(c.isNull, 1).otherwise(0)) + } + ) as 'stats + } + + /** Returns schema of the statistics collected. */ + lazy val statsSchema: StructType = { + recordFrameProfile("Delta", "StatisticsCollection.statsSchema") { + // We invoke the analyzer here to actually figure out what the schema of + // statistics column should be. + { + val s = Dataset.ofRows(spark, LocalRelation(dataSchema.toAttributes)) + .select(statsCollector) + .schema + .find(_.name == "stats") + .get + .dataType + .asInstanceOf[StructType] + // We cannot write null types to Parquet, therefore we need to filter them out. minValues + // and maxValues can be null when we cannot collect stats on any of the data columns + StructType(s.filterNot(_.dataType.isInstanceOf[NullType])).asNullable + } + } + } + + /** + * Generate a truncated data schema for stats collection + * @param schema the original data schema + * @param indexedCols the maximum number of leaf columns to collect stats on + * @return truncated schema and the number of leaf columns in this schema + */ + private def truncateSchema(schema: StructType, indexedCols: Int): (StructType, Int) = { + var accCnt = 0 + var i = 0 + var fields = ArrayBuffer[StructField]() + while (i < schema.length && accCnt < indexedCols) { + val field = schema.fields(i) + val newField = field match { + case StructField(name, st: StructType, nullable, metadata) => + val (newSt, cnt) = truncateSchema(st, indexedCols - accCnt) + accCnt += cnt + StructField(name, newSt, nullable, metadata) + case f => + accCnt += 1 + f + } + i += 1 + fields += newField + } + (StructType(fields.toSeq), accCnt) + } + + /** + * Recursively walks the given schema, constructing an expression to calculate + * multiple statistics that mirrors structure of the data. When `function` is + * defined for a given column, it return value is added to statistics structure. + * When `function` is not defined, that column is skipped. + * + * @param name The name of the top level column for this statistic (i.e. minValues). + * @param schema The schema of the data to collect statistics from. + * @param function A partial function that is passed both a column and metadata about that + * column. Based on the metadata, it can decide if the given statistic + * should be collected by returning the correct aggregate expression. + */ + private def collectStats( + name: String, + schema: StructType)( + function: PartialFunction[(Column, StructField), Column]): Column = { + val allStats = collectStats(schema, None, function) + val stats = if (numIndexedCols > 0) { + allStats.take(numIndexedCols) + } else { + allStats + } + + if (stats.nonEmpty) { + struct(stats: _*).as(name) + } else { + lit(null).as(name) + } + } + + /** Inner recursive call for `collectStats`, should only be called by its overloaded companion. */ + private def collectStats( + schema: StructType, + parent: Option[Column], + function: PartialFunction[(Column, StructField), Column]): Seq[Column] = { + schema.flatMap { + case f @ StructField(name, s: StructType, _, _) => + val column = parent.map(_.getItem(name)) + .getOrElse(new Column(UnresolvedAttribute.quoted(name))) + val stats = collectStats(s, Some(column), function) + if (stats.nonEmpty) { + Some(struct(stats: _*) as DeltaColumnMapping.getPhysicalName(f)) + } else { + None + } + case f @ StructField(name, _, _, _) => + val column = parent.map(_.getItem(name)) + .getOrElse(new Column(UnresolvedAttribute.quoted(name))) + // alias the column with its physical name + function.lift((column, f)).map(_.as(DeltaColumnMapping.getPhysicalName(f))) + } + } +} + +object StatisticsCollection extends DeltaCommand { + /** + * Recomputes statistics for a Delta table. This can be used to compute stats if they were never + * collected or to recompute corrupted statistics. + * @param deltaLog Delta log for the table to update. + * @param predicates Which subset of the data to recompute stats for. Predicates must use only + * partition columns. + * @param fileFilter Filter for which AddFiles to recompute stats for. + */ + def recompute( + spark: SparkSession, + deltaLog: DeltaLog, + predicates: Seq[Expression] = Seq(Literal(true)), + fileFilter: AddFile => Boolean = af => true): Unit = { + val txn = deltaLog.startTransaction() + verifyPartitionPredicates(spark, txn.metadata.partitionColumns, predicates) + + // Save the current AddFiles that match the predicates so we can update their stats + val files = txn.filterFiles(predicates).filter(fileFilter) + val pathToAddFileMap = generateCandidateFileMap(deltaLog.dataPath, files) + + // Use the stats collector to recompute stats + val dataPath = deltaLog.dataPath + val newStats = deltaLog.createDataFrame(txn.snapshot, addFiles = files, isStreaming = false) + .groupBy(input_file_name()).agg(to_json(txn.statsCollector)) + + // Use the new stats to update the AddFiles and commit back to the DeltaLog + val newAddFiles = newStats.collect().map { r => + val add = getTouchedFile(dataPath, r.getString(0), pathToAddFileMap) + add.copy(dataChange = false, stats = r.getString(1)) + } + txn.commit(newAddFiles, ComputeStats(predicates.map(_.sql))) + } + + /** + * Helper method to truncate the input string `x` to the given `prefixLen` length, while also + * appending the unicode max character to the end of the truncated string. This ensures that any + * value in this column is less than or equal to the max. + */ + def truncateMaxStringAgg(prefixLen: Int)(x: String): String = { + if (x == null || x.length <= prefixLen) { + x + } else { + // Grab the prefix. We want to append `\ufffd` as a tie-breaker, but that is only safe + // if the character we truncated was smaller. Keep extending the prefix until that + // condition holds, or we run off the end of the string. + // scalastyle:off nonascii + val tieBreaker = '\ufffd' + x.take(prefixLen) + x.substring(prefixLen).takeWhile(_ >= tieBreaker) + tieBreaker + // scalastyle:off nonascii + } + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/StatsProvider.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/StatsProvider.scala new file mode 100644 index 00000000000..df1233068c2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/StatsProvider.scala @@ -0,0 +1,104 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.Column + +/** + * A helper class that provides the functionalities to create [[DataSkippingPredicate]] with + * the statistics for a column. + * + * @param getStat A function that returns an expression to access the given statistics for a + * specific column, or None if that stats column does not exist. For example, + * [[DataSkippingReaderBase.getStatsColumnOpt]] can be used here. + */ + +private [stats] class StatsProvider(getStat: StatsColumn => Option[Column]) { + /** + * Given a [[StatsColumn]], which represents a stats column for a table column, returns a + * [[DataSkippingPredicate]] which includes a data skipping expression (the result of running + * `f` on the expression of accessing the given stats) and the stats column (which the data + * skipping expression depends on), or None if the stats column does not exist. + * + * @param statCol A stats column (MIN, MAX, etc) for a table column name. + * @param f A user-provided function that returns a data skipping expression given the expression + * to access the statistics for `statCol`. + * @return A [[DataSkippingPredicate]] with a data skipping expression, or None if the given + * stats column does not exist. + */ + def getPredicateWithStatsColumn(statCol: StatsColumn) + (f: Column => Column): Option[DataSkippingPredicate] = { + for (stat <- getStat(statCol)) + yield DataSkippingPredicate(f(stat), statCol) + } + + /** A variant of [[getPredicateWithStatsColumn]] with two stats columns. */ + def getPredicateWithStatsColumns(statCol1: StatsColumn, statCol2: StatsColumn) + (f: (Column, Column) => Column): Option[DataSkippingPredicate] = { + for (stat1 <- getStat(statCol1); stat2 <- getStat(statCol2)) + yield DataSkippingPredicate(f(stat1, stat2), statCol1, statCol2) + } + + /** A variant of [[getPredicateWithStatsColumn]] with three stats columns. */ + def getPredicateWithStatsColumns( + statCol1: StatsColumn, + statCol2: StatsColumn, + statCol3: StatsColumn) + (f: (Column, Column, Column) => Column): Option[DataSkippingPredicate] = { + for (stat1 <- getStat(statCol1); stat2 <- getStat(statCol2); stat3 <- getStat(statCol3)) + yield DataSkippingPredicate(f(stat1, stat2, stat3), statCol1, statCol2, statCol3) + } + + /** + * Given a path to a table column and a stat type (MIN, MAX, etc.), returns a + * [[DataSkippingPredicate]] which includes a data skipping expression (the result of running + * `f` on the expression of accessing the given stats) and the stats column (which the data + * skipping expression depends on), or None if the stats column does not exist. + * + * @param pathToColumn The name of a column whose stats are to be accessed. + * @param statType The type of stats to access (MIN, MAX, etc.) + * @param f A user-provided function that returns a data skipping expression given the expression + * to access the statistics for `statCol`. + * @return A [[DataSkippingPredicate]] with a data skipping expression, or None if the given + * stats column does not exist. + */ + def getPredicateWithStatType(pathToColumn: Seq[String], statType: String) + (f: Column => Column): Option[DataSkippingPredicate] = { + getPredicateWithStatsColumn(StatsColumn(statType, pathToColumn))(f) + } + + /** A variant of [[getPredicateWithStatType]] with two stat types. */ + def getPredicateWithStatTypes(pathToColumn: Seq[String], statType1: String, statType2: String) + (f: (Column, Column) => Column): Option[DataSkippingPredicate] = { + getPredicateWithStatsColumns( + StatsColumn(statType1, pathToColumn), + StatsColumn(statType2, pathToColumn))(f) + } + + /** A variant of [[getPredicateWithStatType]] with three stat types. */ + def getPredicateWithStatTypes( + pathToColumn: Seq[String], + statType1: String, + statType2: String, + statType3: String) + (f: (Column, Column, Column) => Column): Option[DataSkippingPredicate] = { + getPredicateWithStatsColumns( + StatsColumn(statType1, pathToColumn), + StatsColumn(statType2, pathToColumn), + StatsColumn(statType3, pathToColumn))(f) + } +} diff --git a/core/src/main/scala/org/apache/spark/sql/delta/stats/UsesMetadataFields.scala b/core/src/main/scala/org/apache/spark/sql/delta/stats/UsesMetadataFields.scala new file mode 100644 index 00000000000..fb8ed036cde --- /dev/null +++ b/core/src/main/scala/org/apache/spark/sql/delta/stats/UsesMetadataFields.scala @@ -0,0 +1,44 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col + +/** + * A mixin trait for all interfaces that would like to use information stored in Delta's transaction + * log. + */ +trait UsesMetadataFields { + /* The total number of records in the file. */ + final val NUM_RECORDS = "numRecords" + /* The smallest (possibly truncated) value for a column. */ + final val MIN = "minValues" + /* The largest (possibly truncated) value for a column. */ + final val MAX = "maxValues" + /* The number of null values present for a column. */ + final val NULL_COUNT = "nullCount" +} + +/** + * A mixin trait that provides access to the stats fields in the transaction log. + */ +trait ReadsMetadataFields extends UsesMetadataFields { + /** Returns a Column that references the stats field data skipping should use */ + def getBaseStatsColumn: Column = col(getBaseStatsColumnName) + def getBaseStatsColumnName: String = "stats" +}