From ccf7fbc04b85df9cd850f4aa4d1fb6edf7c9c389 Mon Sep 17 00:00:00 2001 From: Christos Stavrakakis Date: Thu, 28 Sep 2023 20:08:14 +0200 Subject: [PATCH] [Spark] Check DV cardinality when reading DVs Update DeletionVectorStore to verify that the cardinality of the DVs we read from files are the expected ones. Closes https://github.com/delta-io/delta/pull/2079 GitOrigin-RevId: c6b3c7a3088bcb05e4541e9e073249c2886b16c9 --- .../resources/error/delta-error-classes.json | 6 +++++ .../apache/spark/sql/delta/DeltaErrors.scala | 7 ++++++ .../delta/deletionvectors/StoredBitmap.scala | 22 +++++++++++++++++-- 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 22403e7d749..51080b7b93c 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -556,6 +556,12 @@ ], "sqlState" : "0AKDE" }, + "DELTA_DELETION_VECTOR_CARDINALITY_MISMATCH" : { + "message" : [ + "Deletion vector integrity check failed. Encountered a cardinality mismatch." + ], + "sqlState" : "XXKDS" + }, "DELTA_DELETION_VECTOR_CHECKSUM_MISMATCH" : { "message" : [ "Could not verify deletion vector integrity, CRC checksum verification failed." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 68625e5f9c8..a7fc14b1077 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2872,6 +2872,13 @@ trait DeltaErrorsBase errorClass = "DELTA_CANNOT_RECONSTRUCT_PATH_FROM_URI", messageParameters = Array(uri)) + def deletionVectorCardinalityMismatch(): Throwable = { + new DeltaChecksumException( + errorClass = "DELTA_DELETION_VECTOR_CARDINALITY_MISMATCH", + messageParameters = Array.empty, + pos = 0 + ) + } def deletionVectorSizeMismatch(): Throwable = { new DeltaChecksumException( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/StoredBitmap.scala b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/StoredBitmap.scala index 5e5d15e323c..d7e66d62bbc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/StoredBitmap.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/StoredBitmap.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.delta.deletionvectors import java.io.{IOException, ObjectInputStream} +import org.apache.spark.sql.delta.DeltaErrorsBase import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor +import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.Path @@ -60,12 +62,13 @@ trait StoredBitmap { */ case class DeletionVectorStoredBitmap( dvDescriptor: DeletionVectorDescriptor, - tableDataPath: Option[Path] = None) extends StoredBitmap { + tableDataPath: Option[Path] = None +) extends StoredBitmap with DeltaLogging with DeltaErrorsBase { require(tableDataPath.isDefined || !dvDescriptor.isOnDisk, "Table path is required for on-disk deletion vectors") override def load(dvStore: DeletionVectorStore): RoaringBitmapArray = { - if (isEmpty) { + val bitmap = if (isEmpty) { new RoaringBitmapArray() } else if (isInline) { RoaringBitmapArray.readFrom(dvDescriptor.inlineData) @@ -73,6 +76,21 @@ case class DeletionVectorStoredBitmap( assert(isOnDisk) dvStore.read(onDiskPath.get, dvDescriptor.offset.getOrElse(0), dvDescriptor.sizeInBytes) } + + // Verify that the cardinality in the bitmap matches the DV descriptor. + if (bitmap.cardinality != dvDescriptor.cardinality) { + recordDeltaEvent( + deltaLog = null, + opType = "delta.assertions.deletionVectorReadCardinalityMismatch", + data = Map( + "deletionVectorPath" -> onDiskPath, + "deletionVectorCardinality" -> bitmap.cardinality, + "deletionVectorDescriptor" -> dvDescriptor), + path = tableDataPath) + throw deletionVectorCardinalityMismatch() + } + + bitmap } override def size: Int = dvDescriptor.sizeInBytes