Skip to content

Commit a2392be

Browse files
dtenedorHyukjinKwon
authored andcommitted
[SPARK-41862][SQL] Fix correctness bug related to DEFAULT values in Orc reader
### What changes were proposed in this pull request? This PR fixes a correctness bug related to column DEFAULT values in Orc reader. * #37280 introduced a performance regression in the Orc reader. * #39362 fixed the performance regression, but stopped the column DEFAULT feature from working, causing a temporary correctness regression that we agreed for me to fix later. * This PR restores column DEFAULT functionality for Orc scans and fixes the correctness regression while not reintroducing the performance regression. ### Why are the changes needed? This PR fixes a correctness bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR updates a unit test to exercise that the Orc scan functionality is correct. Closes #39370 from dtenedor/fix-perf-bug-orc-reader. Authored-by: Daniel Tenedorio <daniel.tenedorio@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent c26d598 commit a2392be

File tree

2 files changed

+19
-67
lines changed

2 files changed

+19
-67
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcDeserializer.scala

Lines changed: 16 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,26 @@ class OrcDeserializer(
4242
// is always null in this case
4343
// - a function that updates target column `index` otherwise.
4444
private val fieldWriters: Array[WritableComparable[_] => Unit] = {
45+
// Assume we create a table backed by Orc files. Then if we later run a command "ALTER TABLE t
46+
// ADD COLUMN c DEFAULT <value>" on the Orc table, this adds one field to the Catalyst schema.
47+
// Then if we query the old files with the new Catalyst schema, we should only apply the
48+
// existence default value to the columns whose IDs are not explicitly requested.
49+
if (requiredSchema.hasExistenceDefaultValues) {
50+
for (i <- 0 until requiredSchema.existenceDefaultValues.size) {
51+
requiredSchema.existenceDefaultsBitmask(i) =
52+
if (requestedColIds(i) != -1) {
53+
false
54+
} else {
55+
requiredSchema.existenceDefaultValues(i) != null
56+
}
57+
}
58+
}
4559
requiredSchema.zipWithIndex
4660
.map { case (f, index) =>
4761
if (requestedColIds(index) == -1) {
4862
null
4963
} else {
50-
// Create a RowUpdater instance for converting Orc objects to Catalyst rows. If any fields
51-
// in the Orc result schema have associated existence default values, maintain a
52-
// boolean array to track which fields have been explicitly assigned for each row.
53-
val rowUpdater: RowUpdater =
54-
if (requiredSchema.hasExistenceDefaultValues) {
55-
resetExistenceDefaultsBitmask(requiredSchema)
56-
new RowUpdaterWithBitmask(resultRow, requiredSchema.existenceDefaultsBitmask)
57-
} else {
58-
new RowUpdater(resultRow)
59-
}
64+
val rowUpdater = new RowUpdater(resultRow)
6065
val writer = newWriter(f.dataType, rowUpdater)
6166
(value: WritableComparable[_]) => writer(index, value)
6267
}
@@ -93,6 +98,7 @@ class OrcDeserializer(
9398
}
9499
targetColumnIndex += 1
95100
}
101+
applyExistenceDefaultValuesToRow(requiredSchema, resultRow)
96102
resultRow
97103
}
98104

@@ -288,49 +294,4 @@ class OrcDeserializer(
288294
override def setDouble(ordinal: Int, value: Double): Unit = array.setDouble(ordinal, value)
289295
override def setFloat(ordinal: Int, value: Float): Unit = array.setFloat(ordinal, value)
290296
}
291-
292-
/**
293-
* Subclass of RowUpdater that also updates a boolean array bitmask. In this way, after all
294-
* assignments are complete, it is possible to inspect the bitmask to determine which columns have
295-
* been written at least once.
296-
*/
297-
final class RowUpdaterWithBitmask(
298-
row: InternalRow, bitmask: Array[Boolean]) extends RowUpdater(row) {
299-
override def setNullAt(ordinal: Int): Unit = {
300-
bitmask(ordinal) = false
301-
super.setNullAt(ordinal)
302-
}
303-
override def set(ordinal: Int, value: Any): Unit = {
304-
bitmask(ordinal) = false
305-
super.set(ordinal, value)
306-
}
307-
override def setBoolean(ordinal: Int, value: Boolean): Unit = {
308-
bitmask(ordinal) = false
309-
super.setBoolean(ordinal, value)
310-
}
311-
override def setByte(ordinal: Int, value: Byte): Unit = {
312-
bitmask(ordinal) = false
313-
super.setByte(ordinal, value)
314-
}
315-
override def setShort(ordinal: Int, value: Short): Unit = {
316-
bitmask(ordinal) = false
317-
super.setShort(ordinal, value)
318-
}
319-
override def setInt(ordinal: Int, value: Int): Unit = {
320-
bitmask(ordinal) = false
321-
super.setInt(ordinal, value)
322-
}
323-
override def setLong(ordinal: Int, value: Long): Unit = {
324-
bitmask(ordinal) = false
325-
super.setLong(ordinal, value)
326-
}
327-
override def setDouble(ordinal: Int, value: Double): Unit = {
328-
bitmask(ordinal) = false
329-
super.setDouble(ordinal, value)
330-
}
331-
override def setFloat(ordinal: Int, value: Float): Unit = {
332-
bitmask(ordinal) = false
333-
super.setFloat(ordinal, value)
334-
}
335-
}
336297
}

sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,7 +1552,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
15521552
test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") {
15531553
case class Config(
15541554
sqlConf: Option[(String, String)],
1555-
insertNullsToStorage: Boolean = true,
15561555
useDataFrames: Boolean = false)
15571556
def runTest(dataSource: String, config: Config): Unit = {
15581557
def insertIntoT(): Unit = {
@@ -1591,10 +1590,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
15911590
sql("insert into t values(null, null, null)")
15921591
}
15931592
sql("alter table t add column (x boolean default true)")
1594-
// By default, INSERT commands into some tables (such as JSON) do not store NULL values.
1595-
// Therefore, if such destination columns have DEFAULT values, SELECTing the same columns
1596-
// will return the default values (instead of NULL) since nothing is present in storage.
1597-
val insertedSColumn = if (config.insertNullsToStorage) null else "abcdef"
1593+
val insertedSColumn = null
15981594
checkAnswer(spark.table("t"),
15991595
Seq(
16001596
Row("xyz", 42, "abcdef", true),
@@ -1679,8 +1675,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
16791675
Config(
16801676
None),
16811677
Config(
1682-
Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false"),
1683-
insertNullsToStorage = false))),
1678+
Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
16841679
TestCase(
16851680
dataSource = "parquet",
16861681
Seq(
@@ -1944,11 +1939,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
19441939
Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl"))),
19451940
Seq(Map(true -> "xyz"))),
19461941
Row(2,
1947-
if (config.dataSource != "orc") {
1948-
null
1949-
} else {
1950-
Row(Seq(Row(1, 2)), Seq(Map(false -> "def", true -> "jkl")))
1951-
},
1942+
null,
19521943
Seq(Map(true -> "xyz"))),
19531944
Row(3,
19541945
Row(Seq(Row(3, 4)), Seq(Map(false -> "mno", true -> "pqr"))),

0 commit comments

Comments
 (0)