Skip to content

Commit 624eda5

Browse files
vladanvasi-dbcloud-fan
authored andcommitted
[SPARK-49444][SQL] Modified UnivocityParser to throw runtime exceptions caused by ArrayIndexOutOfBounds with more user-oriented messages
### What changes were proposed in this pull request? I propose to catch and rethrow runtime `ArrayIndexOutOfBounds` exceptions in the `UnivocityParser` class - `parse` method, but with more user-oriented messages. Instead of throwing exceptions in the original format, I propose to inform the users which csv record caused the error. ### Why are the changes needed? Proper informing of users' errors improves user experience. Instead of throwing `ArrayIndexOutOfBounds` exception without clear reason why it happened, proposed changes throw `SparkRuntimeException` with the message that includes original csv line which caused the error. ### Does this PR introduce _any_ user-facing change? This PR introduces a user-facing change which happens when `UnivocityParser` parses malformed csv line with from the input. More specifically, the change is reproduces in the test case within `UnivocityParserSuite` when user specifies `maxColumns` in parser options and parsed csv record has more columns. Instead of resulting in `ArrayIndexOutOfBounds` like mentioned in the HMR ticket, users now get `SparkRuntimeException` with message that contains the input line which caused the error. ### How was this patch tested? This patch was tested in `UnivocityParserSuite`. Test named "Array index out of bounds when parsing CSV with more columns than expected" covers this patch. Additionally, test for bad records in `UnivocityParser`'s `PERMISSIVE` mode is added to confirm that `BadRecordException` is being thrown properly. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47906 from vladanvasi-db/vladanvasi-db/univocity-parser-index-out-of-bounds-handling. Authored-by: Vladan Vasić <vladan.vasic@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent 87b5ffb commit 624eda5

File tree

5 files changed

+92
-6
lines changed

5 files changed

+92
-6
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import java.io.InputStream
2121

2222
import scala.util.control.NonFatal
2323

24+
import com.univocity.parsers.common.TextParsingException
2425
import com.univocity.parsers.csv.CsvParser
2526

26-
import org.apache.spark.SparkUpgradeException
27+
import org.apache.spark.{SparkRuntimeException, SparkUpgradeException}
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
2930
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
@@ -294,6 +295,20 @@ class UnivocityParser(
294295
}
295296
}
296297

298+
private def parseLine(line: String): Array[String] = {
299+
try {
300+
tokenizer.parseLine(line)
301+
}
302+
catch {
303+
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
304+
throw new SparkRuntimeException(
305+
errorClass = "MALFORMED_CSV_RECORD",
306+
messageParameters = Map("badRecord" -> line),
307+
cause = e
308+
)
309+
}
310+
}
311+
297312
/**
298313
* Parses a single CSV string and turns it into either one resulting row or no row (if the
299314
* the record is malformed).
@@ -306,7 +321,7 @@ class UnivocityParser(
306321
(_: String) => Some(InternalRow.empty)
307322
} else {
308323
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
309-
(input: String) => convert(tokenizer.parseLine(input))
324+
(input: String) => convert(parseLine(input))
310325
}
311326
}
312327

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import java.util.{Locale, TimeZone}
2323

2424
import org.apache.commons.lang3.time.FastDateFormat
2525

26-
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
26+
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkRuntimeException}
2727
import org.apache.spark.sql.catalyst.InternalRow
2828
import org.apache.spark.sql.catalyst.plans.SQLHelper
29+
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
2930
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
3031
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
31-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3232
import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
3333
import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
@@ -323,6 +323,41 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
323323
parameters = Map("fieldName" -> "`i`", "fields" -> ""))
324324
}
325325

326+
test("Bad records test in permissive mode") {
327+
def checkBadRecord(
328+
input: String = "1,a",
329+
dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING, d DOUBLE"),
330+
requiredSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
331+
options: Map[String, String] = Map("mode" -> "PERMISSIVE")): BadRecordException = {
332+
val csvOptions = new CSVOptions(options, false, "UTC")
333+
val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions, Seq())
334+
intercept[BadRecordException] {
335+
parser.parse(input)
336+
}
337+
}
338+
339+
// Bad record exception caused by conversion error
340+
checkBadRecord(input = "1.5,a,10.3")
341+
342+
// Bad record exception caused by insufficient number of columns
343+
checkBadRecord(input = "2")
344+
}
345+
346+
test("Array index out of bounds when parsing CSV with more columns than expected") {
347+
val input = "1,string,3.14,5,7"
348+
val dataSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
349+
val requiredSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
350+
val options = new CSVOptions(Map("maxColumns" -> "2"), false, "UTC")
351+
val filters = Seq()
352+
val parser = new UnivocityParser(dataSchema, requiredSchema, options, filters)
353+
checkError(
354+
exception = intercept[SparkRuntimeException] {
355+
parser.parse(input)
356+
},
357+
condition = "MALFORMED_CSV_RECORD",
358+
parameters = Map("badRecord" -> "1,string,3.14,5,7"))
359+
}
360+
326361
test("SPARK-30960: parse date/timestamp string with legacy format") {
327362
def check(parser: UnivocityParser): Unit = {
328363
// The legacy format allows 1 or 2 chars for some fields.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
1,3.14,string,5,7

sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import java.util.Locale
2424

2525
import scala.jdk.CollectionConverters._
2626

27-
import org.apache.spark.{SparkException, SparkUnsupportedOperationException, SparkUpgradeException}
27+
import org.apache.spark.{SparkException, SparkRuntimeException,
28+
SparkUnsupportedOperationException, SparkUpgradeException}
2829
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
2930
import org.apache.spark.sql.functions._
3031
import org.apache.spark.sql.internal.SQLConf
@@ -234,7 +235,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
234235
val schema = new StructType().add("str", StringType)
235236
val options = Map("maxCharsPerColumn" -> "2")
236237

237-
val exception = intercept[SparkException] {
238+
val exception = intercept[SparkRuntimeException] {
238239
df.select(from_csv($"value", schema, options)).collect()
239240
}.getCause.getMessage
240241

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ abstract class CSVSuite
8585
private val badAfterGoodFile = "test-data/bad_after_good.csv"
8686
private val malformedRowFile = "test-data/malformedRow.csv"
8787
private val charFile = "test-data/char.csv"
88+
private val moreColumnsFile = "test-data/more-columns.csv"
8889

8990
/** Verifies data and schema. */
9091
private def verifyCars(
@@ -3439,6 +3440,39 @@ abstract class CSVSuite
34393440
expected)
34403441
}
34413442
}
3443+
3444+
test("SPARK-49444: CSV parsing failure with more than max columns") {
3445+
val schema = new StructType()
3446+
.add("intColumn", IntegerType, nullable = true)
3447+
.add("decimalColumn", DecimalType(10, 2), nullable = true)
3448+
3449+
val fileReadException = intercept[SparkException] {
3450+
spark
3451+
.read
3452+
.schema(schema)
3453+
.option("header", "false")
3454+
.option("maxColumns", "2")
3455+
.csv(testFile(moreColumnsFile))
3456+
.collect()
3457+
}
3458+
3459+
checkErrorMatchPVals(
3460+
exception = fileReadException,
3461+
condition = "FAILED_READ_FILE.NO_HINT",
3462+
parameters = Map("path" -> s".*$moreColumnsFile"))
3463+
3464+
val malformedCSVException = fileReadException.getCause.asInstanceOf[SparkRuntimeException]
3465+
3466+
checkError(
3467+
exception = malformedCSVException,
3468+
condition = "MALFORMED_CSV_RECORD",
3469+
parameters = Map("badRecord" -> "1,3.14,string,5,7"),
3470+
sqlState = "KD000")
3471+
3472+
assert(malformedCSVException.getCause.isInstanceOf[TextParsingException])
3473+
val textParsingException = malformedCSVException.getCause.asInstanceOf[TextParsingException]
3474+
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
3475+
}
34423476
}
34433477

34443478
class CSVv1Suite extends CSVSuite {

0 commit comments

Comments
 (0)