From 96c4953680988739e26b860b24d3966e3cc1cb1f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 14 Oct 2024 16:40:18 +0200 Subject: [PATCH] [SPARK-49955][SQL] null value does not mean corrupted file when parsing JSON string RDD ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/42979 , to fix a regression. For the `spark.read.json(rdd)` API, there is never corrupted file, and we should not fail if the string value is null with non-failfast parsing mode. This PR is a partial revert of https://github.com/apache/spark/pull/42979 , to not treat `RuntimeException` as corrupted file when we are not reading from files. ### Why are the changes needed? A query used to work in 3.5 should still work in 4.0 ### Does this PR introduce _any_ user-facing change? no as this regression is not released yet. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #48453 from cloud-fan/json. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Max Gekk --- .../sql/catalyst/json/JsonInferSchema.scala | 6 +++- .../datasources/json/JsonDataSource.scala | 3 +- .../csv/CSVParsingOptionsSuite.scala | 35 +++++++++++++++++++ .../json/JsonParsingOptionsSuite.scala | 11 ++++++ 4 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParsingOptionsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index d982e1f19da0c..9c291634401ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -81,7 +81,8 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging { */ def infer[T]( json: RDD[T], - createParser: (JsonFactory, T) => JsonParser): StructType = { + createParser: (JsonFactory, T) => JsonParser, + isReadFile: Boolean = false): StructType = { val parseMode = options.parseMode val columnNameOfCorruptRecord = options.columnNameOfCorruptRecord @@ -96,6 +97,9 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging { Some(inferField(parser)) } } catch { + // If we are not reading from files but hit `RuntimeException`, it means corrupted record. + case e: RuntimeException if !isReadFile => + handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e) case e @ (_: JsonProcessingException | _: MalformedInputException) => handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e) case e: CharConversionException if options.encoding.isEmpty => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 7c98c31bba220..cb4c4f5290880 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -164,7 +164,8 @@ object MultiLineJsonDataSource extends JsonDataSource { .getOrElse(createParser(_: JsonFactory, _: PortableDataStream)) SQLExecution.withSQLConfPropagated(sparkSession) { - new JsonInferSchema(parsedOptions).infer[PortableDataStream](sampled, parser) + new JsonInferSchema(parsedOptions) + .infer[PortableDataStream](sampled, parser, isReadFile = true) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParsingOptionsSuite.scala new file mode 100644 index 0000000000000..8c8304503cef8 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParsingOptionsSuite.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.csv + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSparkSession + +class CSVParsingOptionsSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + test("SPARK-49955: null string value does not mean corrupted file") { + val str = "abc" + val stringDataset = Seq(str, null).toDS() + val df = spark.read.csv(stringDataset) + // `spark.read.csv(rdd)` removes all null values at the beginning. + checkAnswer(df, Seq(Row("abc"))) + val df2 = spark.read.option("mode", "failfast").csv(stringDataset) + checkAnswer(df2, Seq(Row("abc"))) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala index 703085dca66f1..11cc0b99bbde7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonParsingOptionsSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.json +import org.apache.spark.SparkException import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{StringType, StructType} @@ -185,4 +186,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSparkSession { assert(df.first().getString(0) == "Cazen Lee") assert(df.first().getString(1) == "$10") } + + test("SPARK-49955: null string value does not mean corrupted file") { + val str = "{\"name\": \"someone\"}" + val stringDataset = Seq(str, null).toDS() + val df = spark.read.json(stringDataset) + checkAnswer(df, Seq(Row(null, "someone"), Row(null, null))) + + val e = intercept[SparkException](spark.read.option("mode", "failfast").json(stringDataset)) + assert(e.getCause.isInstanceOf[NullPointerException]) + } }