Skip to content

Commit

Permalink
[SPARK-49955][SQL] null value does not mean corrupted file when parsi…
Browse files Browse the repository at this point in the history
…ng JSON string RDD

### What changes were proposed in this pull request?

This is a followup of apache#42979 , to fix a regression. For the `spark.read.json(rdd)` API, there is never corrupted file, and we should not fail if the string value is null with non-failfast parsing mode. This PR is a partial revert of apache#42979 , to not treat `RuntimeException` as corrupted file when we are not reading from files.

### Why are the changes needed?

A query used to work in 3.5 should still work in 4.0

### Does this PR introduce _any_ user-facing change?

no as this regression is not released yet.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#48453 from cloud-fan/json.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
2 people authored and MaxGekk committed Oct 14, 2024
1 parent 0606512 commit 96c4953
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging {
*/
def infer[T](
json: RDD[T],
createParser: (JsonFactory, T) => JsonParser): StructType = {
createParser: (JsonFactory, T) => JsonParser,
isReadFile: Boolean = false): StructType = {
val parseMode = options.parseMode
val columnNameOfCorruptRecord = options.columnNameOfCorruptRecord

Expand All @@ -96,6 +97,9 @@ class JsonInferSchema(options: JSONOptions) extends Serializable with Logging {
Some(inferField(parser))
}
} catch {
// If we are not reading from files but hit `RuntimeException`, it means corrupted record.
case e: RuntimeException if !isReadFile =>
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e)
case e @ (_: JsonProcessingException | _: MalformedInputException) =>
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, e)
case e: CharConversionException if options.encoding.isEmpty =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ object MultiLineJsonDataSource extends JsonDataSource {
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))

SQLExecution.withSQLConfPropagated(sparkSession) {
new JsonInferSchema(parsedOptions).infer[PortableDataStream](sampled, parser)
new JsonInferSchema(parsedOptions)
.infer[PortableDataStream](sampled, parser, isReadFile = true)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.csv

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession

class CSVParsingOptionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._

test("SPARK-49955: null string value does not mean corrupted file") {
val str = "abc"
val stringDataset = Seq(str, null).toDS()
val df = spark.read.csv(stringDataset)
// `spark.read.csv(rdd)` removes all null values at the beginning.
checkAnswer(df, Seq(Row("abc")))
val df2 = spark.read.option("mode", "failfast").csv(stringDataset)
checkAnswer(df2, Seq(Row("abc")))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.json

import org.apache.spark.SparkException
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructType}
Expand Down Expand Up @@ -185,4 +186,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSparkSession {
assert(df.first().getString(0) == "Cazen Lee")
assert(df.first().getString(1) == "$10")
}

test("SPARK-49955: null string value does not mean corrupted file") {
val str = "{\"name\": \"someone\"}"
val stringDataset = Seq(str, null).toDS()
val df = spark.read.json(stringDataset)
checkAnswer(df, Seq(Row(null, "someone"), Row(null, null)))

val e = intercept[SparkException](spark.read.option("mode", "failfast").json(stringDataset))
assert(e.getCause.isInstanceOf[NullPointerException])
}
}

0 comments on commit 96c4953

Please sign in to comment.