diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingFileIndex.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingFileIndex.scala index 0e9e45c0b04..60337de9331 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingFileIndex.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingFileIndex.scala @@ -212,7 +212,7 @@ case class DeltaSharingFileIndex( partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): TahoeLogFileIndex = { val deltaLog = fetchFilesAndConstructDeltaLog(partitionFilters, dataFilters, None) - TahoeLogFileIndex(params.spark, deltaLog) + TahoeLogFileIndex(params.spark, deltaLog, catalogTableOpt = None) } override def listFiles( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index bf20d55af95..9fca7192e43 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -582,7 +582,7 @@ class DeltaLog private( } val fileIndex = TahoeLogFileIndex( - spark, this, dataPath, snapshotToUse, partitionFilters, isTimeTravelQuery) + spark, this, dataPath, snapshotToUse, catalogTableOpt, partitionFilters, isTimeTravelQuery) var bucketSpec: Option[BucketSpec] = None val r = buildHadoopFsRelationWithFileIndex(snapshotToUse, fileIndex, bucketSpec = bucketSpec) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala index caf9d657922..fa4012be663 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, Literal} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.types.StructType @@ -244,6 +245,7 @@ case class TahoeLogFileIndex( override val deltaLog: DeltaLog, override val path: Path, snapshotAtAnalysis: SnapshotDescriptor, + catalogTableOpt: Option[CatalogTable], partitionFilters: Seq[Expression], isTimeTravelQuery: Boolean) extends TahoeFileIndex(spark, deltaLog, path) { @@ -253,6 +255,7 @@ case class TahoeLogFileIndex( deltaLog: DeltaLog, path: Path, snapshotAtAnalysis: Snapshot, + catalogTableOpt: Option[CatalogTable], partitionFilters: Seq[Expression] = Nil, isTimeTravelQuery: Boolean = false ) = this ( @@ -261,6 +264,7 @@ case class TahoeLogFileIndex( path, if (isTimeTravelQuery) snapshotAtAnalysis else new ShallowSnapshotDescriptor(snapshotAtAnalysis), + catalogTableOpt, partitionFilters, isTimeTravelQuery) @@ -288,7 +292,7 @@ case class TahoeLogFileIndex( if (isTimeTravelQuery) { snapshotAtAnalysis.asInstanceOf[Snapshot] } else { - deltaLog.update(stalenessAcceptable = true) + deltaLog.update(stalenessAcceptable = true, catalogTableOpt = catalogTableOpt) } } @@ -366,19 +370,23 @@ case class TahoeLogFileIndex( } object TahoeLogFileIndex { - def apply(spark: SparkSession, deltaLog: DeltaLog): TahoeLogFileIndex = - new TahoeLogFileIndex(spark, deltaLog, deltaLog.dataPath, deltaLog.unsafeVolatileSnapshot) + def apply( + spark: SparkSession, + deltaLog: DeltaLog, + catalogTableOpt: Option[CatalogTable]): TahoeLogFileIndex = + new TahoeLogFileIndex( + spark, deltaLog, deltaLog.dataPath, deltaLog.unsafeVolatileSnapshot, catalogTableOpt) def apply( spark: SparkSession, deltaLog: DeltaLog, path: Path, snapshotAtAnalysis: Snapshot, + catalogTableOpt: Option[CatalogTable], partitionFilters: Seq[Expression] = Nil, isTimeTravelQuery: Boolean = false): TahoeLogFileIndex = new TahoeLogFileIndex( - spark, deltaLog, path, snapshotAtAnalysis, partitionFilters, isTimeTravelQuery - ) + spark, deltaLog, path, snapshotAtAnalysis, catalogTableOpt, partitionFilters, isTimeTravelQuery) } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala index f24e1bd0143..f5001f69182 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, Protocol} import org.apache.spark.sql.delta.catalog.DeltaTableV2 import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics import org.apache.spark.sql.delta.coordinatedcommits.TableCommitCoordinatorClient +import org.apache.spark.sql.delta.files.TahoeLogFileIndex import org.apache.spark.sql.delta.hooks.AutoCompact import org.apache.spark.sql.delta.stats.StatisticsCollection import io.delta.storage.commit.{CommitResponse, GetCommitsResponse, UpdatedActions} @@ -181,6 +182,12 @@ object DeltaTestImplicits { def snapshot: Snapshot = deltaTable.initialSnapshot } + implicit class TahoeLogFileIndexObjectTestHelper(index: TahoeLogFileIndex.type) { + def apply(spark: SparkSession, deltaLog: DeltaLog): TahoeLogFileIndex = { + index.apply(spark, deltaLog, catalogTableOpt = None) + } + } + implicit class AutoCompactObjectTestHelper(ac: AutoCompact.type) { private[delta] def compact( spark: SparkSession,