Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Support predicate pushdown in scans with DVs #2982

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
flush
flush

First sane version without isRowDeleted
  • Loading branch information
andreaschat-db committed Apr 26, 2024
commit 3938944c2f20c487b22df8bf41970b43bc897449
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class DeltaParquetFileFormat(
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
require(!isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
// require(!isSplittable && disablePushDowns,
// "Wrong arguments for Delta table scan with deletion vectors")
}

TypeWidening.assertTableReadable(protocol, metadata)
Expand Down Expand Up @@ -283,8 +283,8 @@ case class DeltaParquetFileFormat(

def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = {
this.copy(
isSplittable = false,
disablePushDowns = true,
// isSplittable = true,
// disablePushDowns = false,
tablePath = Some(tablePath))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ object ScanWithDeletionVectors {
// If the table has no DVs enabled, no change needed
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None

// require(!index.isInstanceOf[TahoeLogFileIndex],
// "Cannot work with a non-pinned table snapshot of the TahoeFileIndex")
require(!index.isInstanceOf[TahoeLogFileIndex],
"Cannot work with a non-pinned table snapshot of the TahoeFileIndex")

// If the table has no DVs enabled, no change needed
if (!deletionVectorsReadable(index.protocol, index.metadata)) return None
Expand Down Expand Up @@ -141,7 +141,7 @@ object ScanWithDeletionVectors {
val skipRowColumnRef = skipRowColumnRefs.head

val keepRow = DeltaUDF.booleanFromByte( _ == RowIndexFilter.KEEP_ROW_VALUE)
.asNondeterministic() // To avoid constant folding the filter based on stats.
// .asNondeterministic() // To avoid constant folding the filter based on stats.

val filterExp = keepRow(new Column(skipRowColumnRef)).expr
Filter(filterExp, newScan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils => SparkUtils}
* Contains utility classes and method for performing DML operations with Deletion Vectors.
*/
object DMLWithDeletionVectorsHelper extends DeltaCommand {
val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE", "MERGE")
val SUPPORTED_DML_COMMANDS: Seq[String] = Seq("DELETE", "UPDATE")

/**
* Creates a DataFrame that can be used to scan for rows matching the condition in the given
Expand Down Expand Up @@ -106,8 +106,8 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
val newProjectList = projectList ++ Seq(rowIndexCol, fileMetadataCol)
p.copy(projectList = newProjectList)
}
// newTarget
target
newTarget
// target
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.FILES_MAX_PARTITION_BYTES

import scala.collection.mutable.ArrayBuffer

// scalastyle:off import.ordering.noEmptyLine
Expand Down Expand Up @@ -287,13 +290,27 @@ class TightBoundsSuite

test("TEST") {
withTempDeltaTable(
dataDF = spark.range(0, 10, 1, 1).toDF("id"),
// .repartition(1)
dataDF = spark.range(0, 50000000, 1, 1).toDF("id"),
// dataDF = spark.range(0, 100000000, 1, 1).toDF("id"),
enableDVs = true
) { (targetTable, targetLog) =>
targetTable().delete("id == 2")

val a = targetTable().toDF.filter("id != 1").collect()
val b = 1
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") {
targetTable().delete("id == 40000000")

// val d = targetTable().toDF.filter("id != 1").queryExecution.executedPlan
// .filter("id != 1")
val a = targetTable().toDF.filter("id != 1").collect()
val c = targetLog.update().allFiles.collect()
val b = 1
assert(a.length === 49999999)

// a(40000000).getLong(0)
assert(a(40000000).getLong(0) === 40000000)
// assert(!a.map(_.getLong(0)).toSeq.contains(40000000))
// assert(a === Seq(0, 100000000).drop(2))
}
}
}

Expand Down