Skip to content

Commit

Permalink
[Spark] Support predicate pushdown in scans with DVs (#2933)
Browse files Browse the repository at this point in the history
## Description

Currently, when Deletion Vectors are enabled we disable predicate
pushdown and splitting in scans. This is because we rely on a custom row
index column which is constructed in the executors and cannot not handle
splits and predicates. These restrictions can now be lifted by relying
instead on `metadata.row_index` which was exposed recently after
relevant [work](https://issues.apache.org/jira/browse/SPARK-37980) was
concluded.

Overall, this PR adds predicate pushdown and splits support as follows:

1. Replaces `__delta_internal_is_row_deleted` with
`_metadata.row_index`.
2. Adds a new implementation of `__delta_internal_is_row_deleted` that
is based on `_metadata.row_index`.
3. `IsRowDeleted` filter is now non deterministic to allow predicate
pushdown.

Furthermore, it includes previous relevant
[work](#2576) to remove the UDF
from `IsRowDeleted` filter.

## How was this patch tested?
Added new suites.
  • Loading branch information
andreaschat-db authored Apr 26, 2024
1 parent 78970ab commit f4a4944
Show file tree
Hide file tree
Showing 15 changed files with 759 additions and 173 deletions.
32 changes: 28 additions & 4 deletions spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta;

import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
Expand All @@ -26,14 +27,37 @@ public interface RowIndexFilter {

/**
* Materialize filtering information for all rows in the range [start, end)
* by filling a boolean column vector batch.
* by filling a boolean column vector batch. Assumes the indexes of the rows in the batch are
* consecutive and start from 0.
*
* @param start Beginning index of the filtering range (inclusive)
* @param end End index of the filtering range (exclusive)
* @param batch The column vector for the current batch to materialize the range into
* @param start Beginning index of the filtering range (inclusive).
* @param end End index of the filtering range (exclusive).
* @param batch The column vector for the current batch to materialize the range into.
*/
void materializeIntoVector(long start, long end, WritableColumnVector batch);

/**
* Materialize filtering information for all rows in the batch. This is achieved by probing
* the roaring bitmap with the row index of every row in the batch.
*
* @param batchSize The size of the batch.
* @param rowIndexColumn A column vector that contains the row index of each row in the batch.
* @param batch The column vector for the current batch to materialize the range into.
*/
void materializeIntoVectorWithRowIndex(
int batchSize,
ColumnVector rowIndexColumn,
WritableColumnVector batch);

/**
* Materialize filtering information for batches with a single row.
*
* @param rowIndex The index of the row to materialize the filtering information.
* @param batch The column vector for the current batch to materialize the range into.
* We assume it contains a single row.
*/
void materializeSingleRowWithRowIndex(long rowIndex, WritableColumnVector batch);

/**
* Value that must be materialised for a row to be kept after filtering.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class DeltaSparkSessionExtension extends (SparkSessionExtensions => Unit) {
new PrepareDeltaScan(session)
}

// Add skip row column and filter.
extensions.injectPlannerStrategy(PreprocessTableWithDVsStrategy)

// Tries to load PrepareDeltaSharingScan class with class reflection, when delta-sharing-spark
// 3.1+ package is installed, this will be loaded and delta sharing batch queries with
// DeltaSharingFileIndex will be handled by the rule.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.RowIndexFilterType
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol}
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter}
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -55,14 +57,18 @@ case class DeltaParquetFileFormat(
protocol: Protocol,
metadata: Metadata,
nullableRowTrackingFields: Boolean = false,
isSplittable: Boolean = true,
disablePushDowns: Boolean = false,
tablePath: Option[String] = None)
optimizationsEnabled: Boolean = true,
tablePath: Option[String] = None,
isCDCRead: Boolean = false)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
require(!isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
SparkSession.getActiveSession.map { session =>
val useMetadataRowIndex =
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
require(useMetadataRowIndex == optimizationsEnabled,
"Wrong arguments for Delta table scan with deletion vectors")
}
}

TypeWidening.assertTableReadable(protocol, metadata)
Expand Down Expand Up @@ -107,7 +113,7 @@ case class DeltaParquetFileFormat(
* physical column names.
*/
private def prepareFiltersForRead(filters: Seq[Filter]): Seq[Filter] = {
if (disablePushDowns) {
if (!optimizationsEnabled) {
Seq.empty
} else if (columnMappingMode != NoMapping) {
val physicalNameMap = DeltaColumnMapping.getLogicalNameToPhysicalNameMap(referenceSchema)
Expand All @@ -118,7 +124,9 @@ case class DeltaParquetFileFormat(
}

override def isSplitable(
sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = optimizationsEnabled

def hasTablePath: Boolean = tablePath.isDefined

Expand All @@ -131,8 +139,7 @@ case class DeltaParquetFileFormat(
case ff: DeltaParquetFileFormat =>
ff.columnMappingMode == columnMappingMode &&
ff.referenceSchema == referenceSchema &&
ff.isSplittable == isSplittable &&
ff.disablePushDowns == disablePushDowns
ff.optimizationsEnabled == optimizationsEnabled
case _ => false
}
}
Expand All @@ -147,6 +154,10 @@ case class DeltaParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {

val useMetadataRowIndexConf = DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX
val useMetadataRowIndex = sparkSession.sessionState.conf.getConf(useMetadataRowIndexConf)

val parquetDataReader: PartitionedFile => Iterator[InternalRow] =
super.buildReaderWithPartitionValues(
sparkSession,
Expand All @@ -166,17 +177,26 @@ case class DeltaParquetFileFormat(
}
results.headOption.map(e => ColumnMetadata(e._2, e._1))
}
val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME)
val rowIndexColumn = findColumn(ROW_INDEX_COLUMN_NAME)

if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) {
return parquetDataReader // no additional metadata is needed.
val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME)
val rowIndexColumnName = if (useMetadataRowIndex) {
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
} else {
// verify the file splitting and filter pushdown are disabled. The new additional
// metadata columns cannot be generated with file splitting and filter pushdowns
require(!isSplittable, "Cannot generate row index related metadata with file splitting")
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown")
ROW_INDEX_COLUMN_NAME
}
val rowIndexColumn = findColumn(rowIndexColumnName)

// We don't have any additional columns to generate, just return the original reader as is.
if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return parquetDataReader

// We are using the row_index col generated by the parquet reader and there are no more
// columns to generate.
if (useMetadataRowIndex && isRowDeletedColumn.isEmpty) return parquetDataReader

// Verify that either predicate pushdown with metadata column is enabled or optimizations
// are disabled.
require(useMetadataRowIndex || !optimizationsEnabled,
"Cannot generate row index related metadata with file splitting or predicate pushdown")

if (hasTablePath && isRowDeletedColumn.isEmpty) {
throw new IllegalArgumentException(
Expand All @@ -196,7 +216,8 @@ case class DeltaParquetFileFormat(
isRowDeletedColumn,
rowIndexColumn,
useOffHeapBuffers,
serializableHadoopConf)
serializableHadoopConf,
useMetadataRowIndex)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
} catch {
case NonFatal(e) =>
Expand All @@ -217,20 +238,19 @@ case class DeltaParquetFileFormat(
}

override def metadataSchemaFields: Seq[StructField] = {
val rowTrackingFields =
RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields)
// TODO(SPARK-47731): Parquet reader in Spark has a bug where a file containing 2b+ rows
// in a single rowgroup causes it to run out of the `Integer` range.
// For Delta Parquet readers don't expose the row_index field as a metadata field.
if (!RowId.isEnabled(protocol, metadata)) {
super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD)
} else {
// It is fine to expose the row_index field as a metadata field when Row Tracking
// is enabled because it is needed to generate the Row ID field, and it is not a
// big problem if we use 2b+ rows in a single rowgroup, it will throw an exception and
// we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is
// not a common use case.
super.metadataSchemaFields ++ rowTrackingFields
// For Delta Parquet readers don't expose the row_index field as a metadata field when it is
// not strictly required. We do expose it when Row Tracking or DVs are enabled.
// In general, having 2b+ rows in a single rowgroup is not a common use case. When the issue is
// hit an exception is thrown.
(protocol, metadata) match {
// We should not expose row tracking fields for CDC reads.
case (p, m) if RowId.isEnabled(p, m) && !isCDCRead =>
val extraFields = RowTracking.createMetadataStructFields(p, m, nullableRowTrackingFields)
super.metadataSchemaFields ++ extraFields
case (p, m) if deletionVectorsReadable(p, m) => super.metadataSchemaFields
case _ => super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD)
}
}

Expand Down Expand Up @@ -275,10 +295,12 @@ case class DeltaParquetFileFormat(
.updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion)
}

def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = {
def copyWithDVInfo(
tablePath: String,
optimizationsEnabled: Boolean): DeltaParquetFileFormat = {
// When predicate pushdown is enabled we allow both splits and predicate pushdown.
this.copy(
isSplittable = false,
disablePushDowns = true,
optimizationsEnabled = optimizationsEnabled,
tablePath = Some(tablePath))
}

Expand All @@ -287,16 +309,21 @@ case class DeltaParquetFileFormat(
* following metadata columns.
* - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion vector corresponding
* to this file
* - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file.
* - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note, this column is only
* populated when we are not using _metadata.row_index column.
*/
private def iteratorWithAdditionalMetadataColumns(
partitionedFile: PartitionedFile,
iterator: Iterator[Object],
isRowDeletedColumn: Option[ColumnMetadata],
rowIndexColumn: Option[ColumnMetadata],
isRowDeletedColumnOpt: Option[ColumnMetadata],
rowIndexColumnOpt: Option[ColumnMetadata],
useOffHeapBuffers: Boolean,
serializableHadoopConf: SerializableConfiguration): Iterator[Object] = {
val rowIndexFilter = isRowDeletedColumn.map { col =>
serializableHadoopConf: SerializableConfiguration,
useMetadataRowIndex: Boolean): Iterator[Object] = {
require(!useMetadataRowIndex || rowIndexColumnOpt.isDefined,
"useMetadataRowIndex is enabled but rowIndexColumn is not defined.")

val rowIndexFilterOpt = isRowDeletedColumnOpt.map { col =>
// Fetch the DV descriptor from the broadcast map and create a row index filter
val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
.get(FILE_ROW_INDEX_FILTER_ID_ENCODED)
Expand All @@ -322,37 +349,49 @@ case class DeltaParquetFileFormat(
}
}

val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get)
// We only generate the row index column when predicate pushdown is not enabled.
val rowIndexColumnToWriteOpt = if (useMetadataRowIndex) None else rowIndexColumnOpt
val metadataColumnsToWrite =
Seq(isRowDeletedColumnOpt, rowIndexColumnToWriteOpt).filter(_.nonEmpty).map(_.get)

// Unfortunately there is no way to verify the Parquet index is starting from 0.
// We disable the splits, so the assumption is ParquetFileFormat respects that
// When metadata.row_index is not used there is no way to verify the Parquet index is
// starting from 0. We disable the splits, so the assumption is ParquetFileFormat respects
// that.
var rowIndex: Long = 0

// Used only when non-column row batches are received from the Parquet reader
val tempVector = new OnHeapColumnVector(1, ByteType)

iterator.map { row =>
row match {
case batch: ColumnarBatch => // When vectorized Parquet reader is enabled
case batch: ColumnarBatch => // When vectorized Parquet reader is enabled.
val size = batch.numRows()
// Create vectors for all needed metadata columns.
// We can't use the one from Parquet reader as it set the
// [[WritableColumnVector.isAllNulls]] to true and it can't be reset with using any
// public APIs.
trySafely(useOffHeapBuffers, size, metadataColumns) { writableVectors =>
trySafely(useOffHeapBuffers, size, metadataColumnsToWrite) { writableVectors =>
val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)]

// When predicate pushdown is enabled we use _metadata.row_index. Therefore,
// we only need to construct the isRowDeleted column.
var index = 0
isRowDeletedColumn.foreach { columnMetadata =>
isRowDeletedColumnOpt.foreach { columnMetadata =>
val isRowDeletedVector = writableVectors(index)
rowIndexFilter.get
.materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector)
if (useMetadataRowIndex) {
rowIndexFilterOpt.get.materializeIntoVectorWithRowIndex(
size, batch.column(rowIndexColumnOpt.get.index), isRowDeletedVector)
} else {
rowIndexFilterOpt.get
.materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector)
}
indexVectorTuples += (columnMetadata.index -> isRowDeletedVector)
index += 1
}

rowIndexColumn.foreach { columnMetadata =>
rowIndexColumnToWriteOpt.foreach { columnMetadata =>
val rowIndexVector = writableVectors(index)
// populate the row index column value
// populate the row index column value.
for (i <- 0 until size) {
rowIndexVector.putLong(i, rowIndex + i)
}
Expand All @@ -373,26 +412,38 @@ case class DeltaParquetFileFormat(
// column values. This is not efficient. It should affect only the wide
// tables. https://github.com/delta-io/delta/issues/2246
val newRow = columnarRow.copy();
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector)
isRowDeletedColumnOpt.foreach { columnMetadata =>
val rowIndexForFiltering = if (useMetadataRowIndex) {
columnarRow.getLong(rowIndexColumnOpt.get.index)
} else {
rowIndex
}
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, tempVector)
newRow.setByte(columnMetadata.index, tempVector.getByte(0))
}

rowIndexColumn.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
rowIndexColumnToWriteOpt
.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
rowIndex += 1
newRow

newRow
case rest: InternalRow => // When vectorized Parquet reader is disabled
// Temporary vector variable used to get DV values from RowIndexFilter
// Currently the RowIndexFilter only supports writing into a columnar vector
// and doesn't have methods to get DV value for a specific row index.
// TODO: This is not efficient, but it is ok given the default reader is vectorized
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector)
isRowDeletedColumnOpt.foreach { columnMetadata =>
val rowIndexForFiltering = if (useMetadataRowIndex) {
rest.getLong(rowIndexColumnOpt.get.index)
} else {
rowIndex
}
rowIndexFilterOpt.get.materializeSingleRowWithRowIndex(rowIndexForFiltering, tempVector)
rest.setByte(columnMetadata.index, tempVector.getByte(0))
}

rowIndexColumn.foreach(columnMetadata => rest.setLong(columnMetadata.index, rowIndex))
rowIndexColumnToWriteOpt
.foreach(columnMetadata => rest.setLong(columnMetadata.index, rowIndex))
rowIndex += 1
rest
case others =>
Expand Down
Loading

0 comments on commit f4a4944

Please sign in to comment.