Skip to content

Commit

Permalink
[SC-20324] Refactored DeltaTable
Browse files Browse the repository at this point in the history
Refactored DeltaTable to take SparkSession and DeltaLog as parameters instead of DataFrame.

Closes #5934 from rahulsmahadev/deltatable_refacotr.

Authored-by: Rahul Mahadev <rahul.mahadev@databricks.com>
Signed-off-by: Rahul Mahadev <rahul.mahadev@databricks.com>
GitOrigin-RevId: 663ab2070c58e900ca3dd73452696281cbff1fc7
  • Loading branch information
rahulsmahadev authored and zsxwing committed Aug 9, 2019
1 parent 3100b6d commit 75439ff
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 26 deletions.
14 changes: 7 additions & 7 deletions src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.InterfaceStability._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases

/**
* :: Evolving ::
Expand All @@ -38,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
* @since 0.3.0
*/
@Evolving
class DeltaTable private(df: Dataset[Row])
class DeltaTable private(df: Dataset[Row], deltaLog: DeltaLog)
extends DeltaTableOperations {

/**
Expand All @@ -50,7 +49,7 @@ class DeltaTable private(df: Dataset[Row])
* @since 0.3.0
*/
@Evolving
def as(alias: String): DeltaTable = new DeltaTable(df.as(alias))
def as(alias: String): DeltaTable = new DeltaTable(df.as(alias), deltaLog)

/**
* :: Evolving ::
Expand Down Expand Up @@ -107,7 +106,7 @@ class DeltaTable private(df: Dataset[Row])
*/
@Evolving
def history(limit: Int): DataFrame = {
executeHistory(Some(limit))
executeHistory(deltaLog, Some(limit))
}

/**
Expand All @@ -120,7 +119,7 @@ class DeltaTable private(df: Dataset[Row])
*/
@Evolving
def history(): DataFrame = {
executeHistory(None)
executeHistory(deltaLog, None)
}

/**
Expand Down Expand Up @@ -492,7 +491,7 @@ object DeltaTable {
* Private method for internal usage only. Do not call this directly.
*/
@Unstable
def apply(df: DataFrame): DeltaTable = new DeltaTable(df)
def apply(df: DataFrame, deltaLog: DeltaLog): DeltaTable = new DeltaTable(df, deltaLog)

/**
* :: Evolving ::
Expand Down Expand Up @@ -524,7 +523,8 @@ object DeltaTable {
@Evolving
def forPath(sparkSession: SparkSession, path: String): DeltaTable = {
if (DeltaTableUtils.isDeltaTable(sparkSession, new Path(path))) {
new DeltaTable(sparkSession.read.format("delta").load(path))
new DeltaTable(sparkSession.read.format("delta").load(path),
DeltaLog.forTable(sparkSession, path))
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(path)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.sql.delta.util.AnalysisHelper
import io.delta.tables.DeltaTable

import org.apache.spark.sql.{functions, Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -65,15 +64,13 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
target.toDF.queryExecution.analyzed, updateColumns, updateExpressions, condition)
}

protected def executeHistory(limit: Option[Int]): DataFrame = {
protected def executeHistory(deltaLog: DeltaLog, limit: Option[Int]): DataFrame = {
val history = new DeltaHistoryManager(deltaLog)
val spark = self.toDF.sparkSession
spark.createDataFrame(history.getHistory(limit))
}

protected def executeUpdate(
set: Map[String, Column],
condition: Option[Column]): Unit = {
protected def executeUpdate(set: Map[String, Column], condition: Option[Column]): Unit = {
val setColumns = set.map{ case (col, expr) => (col, expr) }.toSeq

// Current UPDATE does not support subquery,
Expand All @@ -100,15 +97,9 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
protected def executeVacuum(
deltaLog: DeltaLog,
retentionHours: Option[Double]): DataFrame = {
val sparkSession = self.toDF.sparkSession
VacuumCommand.gc(sparkSession, deltaLog, false, retentionHours)
sparkSession.emptyDataFrame
}

protected lazy val deltaLog = (EliminateSubqueryAliases(self.toDF.queryExecution.analyzed) match {
case DeltaFullTable(tahoeFileIndex) =>
tahoeFileIndex
}).deltaLog

protected lazy val sparkSession: SparkSession = self.toDF.sparkSession
protected def sparkSession = self.toDF.sparkSession
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class DeleteScalaSuite extends DeleteSuiteBase {
val path = tableNameOrPath.stripPrefix("delta.`").stripSuffix("`")
io.delta.tables.DeltaTable.forPath(spark, path)
} else {
io.delta.tables.DeltaTable(spark.table(tableNameOrPath))
io.delta.tables.DeltaTable(spark.table(tableNameOrPath),
DeltaLog.forTable(spark, tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ trait DescribeDeltaHistorySuiteBase
}
}

test("logging and limit") {
testWithFlag("logging and limit") {
val tempDir = Utils.createTempDir().toString
Seq(1, 2, 3).toDF().write.format("delta").save(tempDir)
Seq(4, 5, 6).toDF().write.format("delta").mode("overwrite").save(tempDir)
Expand Down Expand Up @@ -168,7 +168,7 @@ trait DescribeDeltaHistorySuiteBase
Seq($"operation", $"operationParameters.predicate"))
}

test("old and new writers") {
testWithFlag("old and new writers") {
val tempDir = Utils.createTempDir().toString
withSQLConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED.key -> "false") {
Seq(1, 2, 3).toDF().write.format("delta").save(tempDir.toString)
Expand All @@ -183,7 +183,7 @@ trait DescribeDeltaHistorySuiteBase
checkLastOperation(tempDir, Seq("WRITE", "Append"))
}

test("order history by version") {
testWithFlag("order history by version") {
val tempDir = Utils.createTempDir().toString

withSQLConf(DeltaSQLConf.DELTA_COMMIT_INFO_ENABLED.key -> "false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class MergeIntoScalaSuite extends MergeIntoSuiteBase {
val path = nameOrPath.stripPrefix("delta.`").stripSuffix("`")
io.delta.tables.DeltaTable.forPath(spark, path)
} else {
io.delta.tables.DeltaTable(spark.table(nameOrPath))
io.delta.tables.DeltaTable(spark.table(nameOrPath), DeltaLog.forTable(spark, nameOrPath))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class UpdateScalaSuite extends UpdateSuiteBase {
val path = tableNameOrPath.stripPrefix("delta.`").stripSuffix("`")
io.delta.tables.DeltaTable.forPath(spark, path)
} else {
io.delta.tables.DeltaTable(spark.table(tableNameOrPath))
io.delta.tables.DeltaTable(spark.table(tableNameOrPath),
DeltaLog.forTable(spark, tableNameOrPath))
}
optionalAlias.map(table.as(_)).getOrElse(table)
}
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.3.1-SNAPSHOT"
version in ThisBuild := "0.2.1-SNAPSHOT"

0 comments on commit 75439ff

Please sign in to comment.