Skip to content

Commit

Permalink
[Spark] Define OptimisticTransaction.catalogTable
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
-Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

As part of implementing delta-io#2052, `OptimisticTransaction` needs the ability to track a `CatalogTable` for the table it updates. That way, post-commit hooks can reliably identify catalog-based tables and make appropriate catalog calls in response to table changes.

For now, we just define the new field, and add a new catalog-aware overload of `DeltaLog.startTransaction` that leverages it. Future work will start updating call sites to actually pass catalog information when starting a transaction.

The new field is currently not used, so nothing really to test.
Existing unit tests verify the existing overloads are not broken by the change.

## Does this PR introduce _any_ user-facing changes?

No

Closes delta-io#2083

GitOrigin-RevId: 03f2d2732a939cdd9ee2e56e07b23e8be00bcb6f
  • Loading branch information
ryan-johnson-databricks authored and allisonport-db committed Sep 21, 2023
1 parent 60a3c03 commit 444bdfc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
26 changes: 21 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,33 @@ class DeltaLog private(
* ------------------ */

/**
* Returns a new [[OptimisticTransaction]] that can be used to read the current state of the
* log and then commit updates. The reads and updates will be checked for logical conflicts
* with any concurrent writes to the log.
* Returns a new [[OptimisticTransaction]] that can be used to read the current state of the log
* and then commit updates. The reads and updates will be checked for logical conflicts with any
* concurrent writes to the log, and post-commit hooks can be used to notify the table's catalog
* of schema changes, etc.
*
* Note that all reads in a transaction must go through the returned transaction object, and not
* directly to the [[DeltaLog]] otherwise they will not be checked for conflicts.
*
* @param catalogTableOpt The [[CatalogTable]] for the table this transaction updates. Passing
* None asserts this is a path-based table with no catalog entry.
*
* @param snapshotOpt THe [[Snapshot]] this transaction should use, if not latest.
*/
def startTransaction(): OptimisticTransaction = startTransaction(None)
def startTransaction(
catalogTableOpt: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
new OptimisticTransaction(this, catalogTableOpt, snapshotOpt)
}

/** Legacy/compat overload that does not require catalog table information. Avoid prod use. */
@deprecated("Please use the CatalogTable overload instead", "3.0")
def startTransaction(): OptimisticTransaction = startTransaction(snapshotOpt = None)

/** Legacy/compat overload that does not require catalog table information. Avoid prod use. */
@deprecated("Please use the CatalogTable overload instead", "3.0")
def startTransaction(snapshotOpt: Option[Snapshot]): OptimisticTransaction = {
new OptimisticTransaction(this, snapshotOpt.getOrElse(update()))
startTransaction(catalogTableOpt = None, snapshotOpt)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -138,9 +139,15 @@ private[delta] case class DeltaTableReadPredicate(
*/
class OptimisticTransaction(
override val deltaLog: DeltaLog,
override val catalogTable: Option[CatalogTable],
override val snapshot: Snapshot)
extends OptimisticTransactionImpl
with DeltaLogging {
def this(
deltaLog: DeltaLog,
catalogTable: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None) =
this(deltaLog, catalogTable, snapshotOpt.getOrElse(deltaLog.update()))
}

object OptimisticTransaction {
Expand Down Expand Up @@ -206,6 +213,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
import org.apache.spark.sql.delta.util.FileNames._

val deltaLog: DeltaLog
val catalogTable: Option[CatalogTable]
val snapshot: Snapshot
def clock: Clock = deltaLog.clock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ case class DeltaTableV2(
* was provided.
*/
def startTransaction(snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
deltaLog.startTransaction(snapshotOpt)
deltaLog.startTransaction(catalogTable, snapshotOpt)
}

/**
Expand Down

0 comments on commit 444bdfc

Please sign in to comment.