Skip to content

Commit

Permalink
[Spark] Check DV cardinality when reading DVs
Browse files Browse the repository at this point in the history
Update DeletionVectorStore to verify that the cardinality of the DVs we read from files are the expected ones.

Closes #2079

GitOrigin-RevId: c6b3c7a3088bcb05e4541e9e073249c2886b16c9
  • Loading branch information
cstavr authored and vkorukanti committed Sep 28, 2023
1 parent aa12854 commit ccf7fbc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 2 deletions.
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,12 @@
],
"sqlState" : "0AKDE"
},
"DELTA_DELETION_VECTOR_CARDINALITY_MISMATCH" : {
"message" : [
"Deletion vector integrity check failed. Encountered a cardinality mismatch."
],
"sqlState" : "XXKDS"
},
"DELTA_DELETION_VECTOR_CHECKSUM_MISMATCH" : {
"message" : [
"Could not verify deletion vector integrity, CRC checksum verification failed."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2872,6 +2872,13 @@ trait DeltaErrorsBase
errorClass = "DELTA_CANNOT_RECONSTRUCT_PATH_FROM_URI",
messageParameters = Array(uri))

def deletionVectorCardinalityMismatch(): Throwable = {
new DeltaChecksumException(
errorClass = "DELTA_DELETION_VECTOR_CARDINALITY_MISMATCH",
messageParameters = Array.empty,
pos = 0
)
}

def deletionVectorSizeMismatch(): Throwable = {
new DeltaChecksumException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.deletionvectors

import java.io.{IOException, ObjectInputStream}

import org.apache.spark.sql.delta.DeltaErrorsBase
import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -60,19 +62,35 @@ trait StoredBitmap {
*/
case class DeletionVectorStoredBitmap(
dvDescriptor: DeletionVectorDescriptor,
tableDataPath: Option[Path] = None) extends StoredBitmap {
tableDataPath: Option[Path] = None
) extends StoredBitmap with DeltaLogging with DeltaErrorsBase {
require(tableDataPath.isDefined || !dvDescriptor.isOnDisk,
"Table path is required for on-disk deletion vectors")

override def load(dvStore: DeletionVectorStore): RoaringBitmapArray = {
if (isEmpty) {
val bitmap = if (isEmpty) {
new RoaringBitmapArray()
} else if (isInline) {
RoaringBitmapArray.readFrom(dvDescriptor.inlineData)
} else {
assert(isOnDisk)
dvStore.read(onDiskPath.get, dvDescriptor.offset.getOrElse(0), dvDescriptor.sizeInBytes)
}

// Verify that the cardinality in the bitmap matches the DV descriptor.
if (bitmap.cardinality != dvDescriptor.cardinality) {
recordDeltaEvent(
deltaLog = null,
opType = "delta.assertions.deletionVectorReadCardinalityMismatch",
data = Map(
"deletionVectorPath" -> onDiskPath,
"deletionVectorCardinality" -> bitmap.cardinality,
"deletionVectorDescriptor" -> dvDescriptor),
path = tableDataPath)
throw deletionVectorCardinalityMismatch()
}

bitmap
}

override def size: Int = dvDescriptor.sizeInBytes
Expand Down

0 comments on commit ccf7fbc

Please sign in to comment.