Skip to content

Commit

Permalink
add statistics
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <penghuo@gmail.com>
  • Loading branch information
penghuo committed Mar 7, 2023
1 parent aec823a commit 79b5fd3
Show file tree
Hide file tree
Showing 20 changed files with 2,367 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package io.delta.sql

import org.apache.spark.sql.delta._
import io.delta.sql.parser.DeltaSqlParser

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.stats.PrepareDeltaScan

/**
* An extension for Spark SQL to activate Delta SQL parser to support Delta SQL grammar.
Expand Down Expand Up @@ -90,8 +89,11 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule { session =>
new PreprocessTableDelete(session.sessionState.conf)
}
extensions.injectOptimizerRule { session =>
new ActiveOptimisticTransactionRule(session)
// We don't use `injectOptimizerRule` here as we won't want to apply further optimizations after
// `PrepareDeltaScan`.
// For example, `ConstantFolding` will break unit tests in `OptimizeGeneratedColumnSuite`.
extensions.injectPreCBORule { session =>
new PrepareDeltaScan(session)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.types.StructField

import java.util.{Locale, UUID}

trait DeltaColumnMappingBase extends DeltaLogging {
val MIN_WRITER_VERSION = 5
val MIN_READER_VERSION = 2
val MIN_PROTOCOL_VERSION = Protocol(MIN_READER_VERSION, MIN_WRITER_VERSION)

val PARQUET_FIELD_ID_METADATA_KEY = "parquet.field.id"
val COLUMN_MAPPING_METADATA_PREFIX = "delta.columnMapping."
val COLUMN_MAPPING_METADATA_ID_KEY = COLUMN_MAPPING_METADATA_PREFIX + "id"
val COLUMN_MAPPING_PHYSICAL_NAME_KEY = COLUMN_MAPPING_METADATA_PREFIX + "physicalName"

def generatePhysicalName: String = "col-" + UUID.randomUUID()

def getPhysicalName(field: StructField): String = {
if (field.metadata.contains(COLUMN_MAPPING_PHYSICAL_NAME_KEY)) {
field.metadata.getString(COLUMN_MAPPING_PHYSICAL_NAME_KEY)
} else {
field.name
}
}
}

object DeltaColumnMapping extends DeltaColumnMappingBase

/**
* A trait for Delta column mapping modes.
*/
sealed trait DeltaColumnMappingMode {
def name: String
}

/**
* No mapping mode uses a column's display name as its true identifier to
* read and write data.
*
* This is the default mode and is the same mode as Delta always has been.
*/
case object NoMapping extends DeltaColumnMappingMode {
val name = "none"
}

object DeltaColumnMappingMode {
def apply(name: String): DeltaColumnMappingMode = {
name.toLowerCase(Locale.ROOT) match {
case NoMapping.name => NoMapping
case mode => throw DeltaErrors.unsupportedColumnMappingMode(mode)
}
}
}
42 changes: 27 additions & 15 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,28 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.io.{FileNotFoundException, IOException}
import java.util.ConcurrentModificationException

import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
import io.delta.sql.DeltaSparkSessionExtension
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata}
import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.constraints.Constraints
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{InvariantViolationException, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import org.apache.hadoop.fs.Path

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.{SparkConf, SparkEnv}

import java.io.{FileNotFoundException, IOException}
import java.util.ConcurrentModificationException


trait DocsPath {
Expand Down Expand Up @@ -122,6 +119,14 @@ object DeltaErrors
|get deleted based on retention settings.
""".stripMargin

/**
* We have plans to support more column mapping modes, but they are not implemented yet,
* so we error for now to be forward compatible with tables created in the future.
*/
def unsupportedColumnMappingMode(mode: String): Throwable =
new ColumnMappingUnsupportedException(s"The column mapping mode `$mode` is " +
s"not supported for this Delta version. Please upgrade if you want to use this mode.")

def deltaSourceIgnoreDeleteError(version: Long, removedFile: String): Throwable = {
new UnsupportedOperationException(
s"Detected deleted data (for example $removedFile) from streaming source at " +
Expand Down Expand Up @@ -1400,3 +1405,10 @@ class MetadataMismatchErrorBuilder {
throw new AnalysisException(bits.mkString("\n"))
}
}


/** Errors thrown around column mapping. */
class ColumnMappingUnsupportedException(msg: String)
extends UnsupportedOperationException(msg)
case class ColumnMappingException(msg: String, mode: DeltaColumnMappingMode)
extends AnalysisException(msg)
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.constraints.Constraint
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{StructField, StructType}
Expand Down Expand Up @@ -316,6 +313,11 @@ object DeltaOperations {
}
}

/** Recorded when recomputing stats on the table. */
case class ComputeStats(predicate: Seq[String]) extends Operation("COMPUTE STATS") {
override val parameters: Map[String, Any] = Map(
"predicate" -> JsonUtils.toJson(predicate))
}

private def structFieldToMap(colPath: Seq[String], field: StructField): Map[String, Any] = {
Map(
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.expressions.{SparkUserDefinedFunction, UserDefinedFunction}
import org.apache.spark.sql.functions.udf

object DeltaUDF {

/**
* A template for String => String udfs. It's used to create `SparkUserDefinedFunction` for
* String => String udfs without touching Scala Reflection to reduce the log contention.
*/
private lazy val stringStringUdfTemplate =
udf[String, String]((x: String) => x).asInstanceOf[SparkUserDefinedFunction]

private def createUdfFromTemplate[R, T](
template: SparkUserDefinedFunction,
f: T => R): UserDefinedFunction = {
template.copy(
f = f,
inputEncoders = template.inputEncoders.map(_.map(_.copy())),
outputEncoder = template.outputEncoder.map(_.copy())
)
}

def stringStringUdf(f: String => String): UserDefinedFunction = {
udf[String, String](f)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,27 @@

package org.apache.spark.sql.delta

import java.net.URI
import java.nio.file.FileAlreadyExistsException
import java.util.{ConcurrentModificationException, Locale}
import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.control.NonFatal

import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.delta.DeltaOperations.Operation
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{GenerateSymlinkManifest, PostCommitHook}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.delta.stats.{DeltaScan, DeltaScanGenerator}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.util.{Clock, Utils}

import java.nio.file.FileAlreadyExistsException
import java.util.ConcurrentModificationException
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.control.NonFatal

/** Record metrics about a successful commit. */
case class CommitStats(
/** The version read by the txn when it starts. */
Expand Down Expand Up @@ -152,6 +148,7 @@ object OptimisticTransaction {
* This trait is not thread-safe.
*/
trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReporting
with DeltaScanGenerator
with DeltaLogging {

import org.apache.spark.sql.delta.util.FileNames._
Expand Down Expand Up @@ -187,6 +184,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
/** Stores the updated protocol (if any) that will result from this txn. */
protected var newProtocol: Option[Protocol] = None

override val snapshotToScan: Snapshot = snapshot
/**
* Tracks the first-access snapshots of other Delta logs read by this transaction.
* The snapshots are keyed by the log's unique id.
*/
protected var readSnapshots = new java.util.concurrent.ConcurrentHashMap[(String, Path), Snapshot]

protected val txnStartNano = System.nanoTime()
protected var commitStartNano = -1L
protected var commitInfo: CommitInfo = _
Expand Down Expand Up @@ -218,6 +222,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
/** Start time of txn in nanoseconds */
def txnStartTimeNs: Long = txnStartNano

/** Gets the stats collector for the table at the snapshot this transaction has. */
def statsCollector: Column = snapshot.statsCollector

/**
* Returns the metadata for this transaction. The metadata refers to the metadata of the snapshot
* at the transaction's read version unless updated during the transaction.
Expand Down Expand Up @@ -363,6 +370,48 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
}
}

/**
* Returns the [[DeltaScanGenerator]] for the given log, which will be used to generate
* [[DeltaScan]]s. Every time this method is called on a log, the returned generator
* generator will read a snapshot that is pinned on the first access for that log.
*
* Internally, if the given log is the same as the log associated with this
* transaction, then it returns this transaction, otherwise it will return a snapshot of
* given log
*/
def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator = {
if (index.deltaLog.isSameLogAs(deltaLog)) {
this
} else {
if (spark.conf.get(DeltaSQLConf.DELTA_SNAPSHOT_ISOLATION)) {
readSnapshots.computeIfAbsent(index.deltaLog.compositeId, _ => {
// Will be called only when the log is accessed the first time
index.getSnapshot
})
} else {
index.getSnapshot
}
}
}

/** Returns a[[DeltaScan]] based on the given filters and projections. */
override def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = {
val scan = snapshot.filesForScan(projection, filters)
val partitionFilters = filters.filter { f =>
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark)
}
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readFiles ++= scan.files
scan
}

override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = {
val metadata = snapshot.filesWithStatsForScan(partitionFilters)
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
withFilesRead(filterFiles(partitionFilters))
metadata
}

/** Returns files matching the given predicates. */
def filterFiles(): Seq[AddFile] = filterFiles(Seq(Literal.TrueLiteral))

Expand All @@ -383,6 +432,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
readTheWholeTable = true
}

/** Mark the given files as read within this transaction. */
def withFilesRead(files: Seq[AddFile]): Unit = {
readFiles ++= files
}

/**
* Returns the latest version that has committed for the idempotent transaction with given `id`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ trait PartitionFiltering { self: Snapshot =>
partitionFilters).as[AddFile].collect()
}

DeltaScan(version = version, files, null, null, null)(null, null, null, null)
DeltaScan(version = version, files, null, null, null)(null, null, null, null, 0, null)
}
}
Loading

0 comments on commit 79b5fd3

Please sign in to comment.