From 444bdfcdc8ea976e776eb20f4822e1ea2541446a Mon Sep 17 00:00:00 2001 From: Ryan Johnson Date: Wed, 20 Sep 2023 15:58:01 -0700 Subject: [PATCH] [Spark] Define OptimisticTransaction.catalogTable #### Which Delta project/connector is this regarding? -Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description As part of implementing https://github.com/delta-io/delta/issues/2052, `OptimisticTransaction` needs the ability to track a `CatalogTable` for the table it updates. That way, post-commit hooks can reliably identify catalog-based tables and make appropriate catalog calls in response to table changes. For now, we just define the new field, and add a new catalog-aware overload of `DeltaLog.startTransaction` that leverages it. Future work will start updating call sites to actually pass catalog information when starting a transaction. The new field is currently not used, so nothing really to test. Existing unit tests verify the existing overloads are not broken by the change. ## Does this PR introduce _any_ user-facing changes? No Closes https://github.com/delta-io/delta/pull/2083 GitOrigin-RevId: 03f2d2732a939cdd9ee2e56e07b23e8be00bcb6f --- .../org/apache/spark/sql/delta/DeltaLog.scala | 26 +++++++++++++++---- .../sql/delta/OptimisticTransaction.scala | 8 ++++++ .../sql/delta/catalog/DeltaTableV2.scala | 2 +- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 23bdba1d15b..ce99d057778 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -184,17 +184,33 @@ class DeltaLog private( * ------------------ */ /** - * Returns a new [[OptimisticTransaction]] that can be used to read the current state of the - * log and then commit updates. The reads and updates will be checked for logical conflicts - * with any concurrent writes to the log. + * Returns a new [[OptimisticTransaction]] that can be used to read the current state of the log + * and then commit updates. The reads and updates will be checked for logical conflicts with any + * concurrent writes to the log, and post-commit hooks can be used to notify the table's catalog + * of schema changes, etc. * * Note that all reads in a transaction must go through the returned transaction object, and not * directly to the [[DeltaLog]] otherwise they will not be checked for conflicts. + * + * @param catalogTableOpt The [[CatalogTable]] for the table this transaction updates. Passing + * None asserts this is a path-based table with no catalog entry. + * + * @param snapshotOpt THe [[Snapshot]] this transaction should use, if not latest. */ - def startTransaction(): OptimisticTransaction = startTransaction(None) + def startTransaction( + catalogTableOpt: Option[CatalogTable], + snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = { + new OptimisticTransaction(this, catalogTableOpt, snapshotOpt) + } + + /** Legacy/compat overload that does not require catalog table information. Avoid prod use. */ + @deprecated("Please use the CatalogTable overload instead", "3.0") + def startTransaction(): OptimisticTransaction = startTransaction(snapshotOpt = None) + /** Legacy/compat overload that does not require catalog table information. Avoid prod use. */ + @deprecated("Please use the CatalogTable overload instead", "3.0") def startTransaction(snapshotOpt: Option[Snapshot]): OptimisticTransaction = { - new OptimisticTransaction(this, snapshotOpt.getOrElse(update())) + startTransaction(catalogTableOpt = None, snapshotOpt) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index dcb768df20b..b3f47ad7b00 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.types.StructType @@ -138,9 +139,15 @@ private[delta] case class DeltaTableReadPredicate( */ class OptimisticTransaction( override val deltaLog: DeltaLog, + override val catalogTable: Option[CatalogTable], override val snapshot: Snapshot) extends OptimisticTransactionImpl with DeltaLogging { + def this( + deltaLog: DeltaLog, + catalogTable: Option[CatalogTable], + snapshotOpt: Option[Snapshot] = None) = + this(deltaLog, catalogTable, snapshotOpt.getOrElse(deltaLog.update())) } object OptimisticTransaction { @@ -206,6 +213,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite import org.apache.spark.sql.delta.util.FileNames._ val deltaLog: DeltaLog + val catalogTable: Option[CatalogTable] val snapshot: Snapshot def clock: Clock = deltaLog.clock diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index d3eb8ac8080..01464824683 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -218,7 +218,7 @@ case class DeltaTableV2( * was provided. */ def startTransaction(snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = { - deltaLog.startTransaction(snapshotOpt) + deltaLog.startTransaction(catalogTable, snapshotOpt) } /**