Skip to content

Commit 976ac5e

Browse files
MaxGekkJackey Lee
authored andcommitted
[SPARK-26303][SQL] Return partial results for bad JSON records
## What changes were proposed in this pull request? In the PR, I propose to return partial results from JSON datasource and JSON functions in the PERMISSIVE mode if some of JSON fields are parsed and converted to desired types successfully. The changes are made only for `StructType`. Whole bad JSON records are placed into the corrupt column specified by the `columnNameOfCorruptRecord` option or SQL config. Partial results are not returned for malformed JSON input. ## How was this patch tested? Added new UT which checks converting JSON strings with one invalid and one valid field at the end of the string. Closes apache#23253 from MaxGekk/json-bad-record. Lead-authored-by: Maxim Gekk <max.gekk@gmail.com> Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent d0a8b20 commit 976ac5e

File tree

8 files changed

+67
-26
lines changed

8 files changed

+67
-26
lines changed

docs/sql-migration-guide-upgrade.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ displayTitle: Spark SQL Upgrading Guide
3737

3838
- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
3939

40-
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
40+
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
41+
42+
- In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully.
4143

4244
## Upgrading From Spark SQL 2.3 to 2.4
4345

python/pyspark/sql/readwriter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
211211
set, it uses the default value, ``PERMISSIVE``.
212212
213213
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
214-
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
214+
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
215215
fields to ``null``. To keep corrupt records, an user can set a string type \
216216
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
217217
schema does not have the field, it drops corrupt records during parsing. \
@@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
424424
set, it uses the default value, ``PERMISSIVE``.
425425
426426
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
427-
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
427+
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
428428
fields to ``null``. To keep corrupt records, an user can set a string type \
429429
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
430430
schema does not have the field, it drops corrupt records during parsing. \

python/pyspark/sql/streaming.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
441441
set, it uses the default value, ``PERMISSIVE``.
442442
443443
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
444-
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
444+
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
445445
fields to ``null``. To keep corrupt records, an user can set a string type \
446446
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
447447
schema does not have the field, it drops corrupt records during parsing. \
@@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
648648
set, it uses the default value, ``PERMISSIVE``.
649649
650650
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
651-
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
651+
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
652652
fields to ``null``. To keep corrupt records, an user can set a string type \
653653
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
654654
schema does not have the field, it drops corrupt records during parsing. \

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import java.nio.charset.MalformedInputException
2222

2323
import scala.collection.mutable.ArrayBuffer
2424
import scala.util.Try
25+
import scala.util.control.NonFatal
2526

2627
import com.fasterxml.jackson.core._
2728

2829
import org.apache.spark.internal.Logging
2930
import org.apache.spark.sql.catalyst.InternalRow
3031
import org.apache.spark.sql.catalyst.expressions._
3132
import org.apache.spark.sql.catalyst.util._
32-
import org.apache.spark.sql.internal.SQLConf
3333
import org.apache.spark.sql.types._
3434
import org.apache.spark.unsafe.types.UTF8String
3535
import org.apache.spark.util.Utils
@@ -347,17 +347,28 @@ class JacksonParser(
347347
schema: StructType,
348348
fieldConverters: Array[ValueConverter]): InternalRow = {
349349
val row = new GenericInternalRow(schema.length)
350+
var badRecordException: Option[Throwable] = None
351+
350352
while (nextUntil(parser, JsonToken.END_OBJECT)) {
351353
schema.getFieldIndex(parser.getCurrentName) match {
352354
case Some(index) =>
353-
row.update(index, fieldConverters(index).apply(parser))
354-
355+
try {
356+
row.update(index, fieldConverters(index).apply(parser))
357+
} catch {
358+
case NonFatal(e) =>
359+
badRecordException = badRecordException.orElse(Some(e))
360+
parser.skipChildren()
361+
}
355362
case None =>
356363
parser.skipChildren()
357364
}
358365
}
359366

360-
row
367+
if (badRecordException.isEmpty) {
368+
row
369+
} else {
370+
throw PartialResultException(row, badRecordException.get)
371+
}
361372
}
362373

363374
/**
@@ -428,6 +439,11 @@ class JacksonParser(
428439
val wrappedCharException = new CharConversionException(msg)
429440
wrappedCharException.initCause(e)
430441
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
442+
case PartialResultException(row, cause) =>
443+
throw BadRecordException(
444+
record = () => recordLiteral(record),
445+
partialResult = () => Some(row),
446+
cause)
431447
}
432448
}
433449
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/BadRecordException.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util
2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.unsafe.types.UTF8String
2222

23+
/**
24+
* Exception thrown when the underlying parser returns a partial result of parsing.
25+
* @param partialResult the partial result of parsing a bad record.
26+
* @param cause the actual exception about why the parser cannot return full result.
27+
*/
28+
case class PartialResultException(
29+
partialResult: InternalRow,
30+
cause: Throwable)
31+
extends Exception(cause)
32+
2333
/**
2434
* Exception thrown when the underlying parser meet a bad record and can't parse it.
2535
* @param record a function to return the record that cause the parser to fail

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
362362
* during parsing.
363363
* <ul>
364364
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
365-
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
365+
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
366366
* keep corrupt records, an user can set a string type field named
367367
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
368368
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
@@ -598,13 +598,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
598598
* during parsing. It supports the following case-insensitive modes.
599599
* <ul>
600600
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
601-
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
602-
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
603-
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
604-
* during parsing. A record with less/more tokens than schema is not a corrupted record to
605-
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
606-
* `null` to extra fields. When the record has more tokens than the length of the schema,
607-
* it drops extra tokens.</li>
601+
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
602+
* To keep corrupt records, an user can set a string type field named
603+
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
604+
* the field, it drops corrupt records during parsing. A record with less/more tokens
605+
* than schema is not a corrupted record to CSV. When it meets a record having fewer
606+
* tokens than the length of the schema, sets `null` to extra fields. When the record
607+
* has more tokens than the length of the schema, it drops extra tokens.</li>
608608
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
609609
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
610610
* </ul>

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
273273
* during parsing.
274274
* <ul>
275275
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
276-
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
276+
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
277277
* keep corrupt records, an user can set a string type field named
278278
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
279279
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
@@ -360,13 +360,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
360360
* during parsing. It supports the following case-insensitive modes.
361361
* <ul>
362362
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
363-
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
364-
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
365-
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
366-
* during parsing. A record with less/more tokens than schema is not a corrupted record to
367-
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
368-
* `null` to extra fields. When the record has more tokens than the length of the schema,
369-
* it drops extra tokens.</li>
363+
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
364+
* To keep corrupt records, an user can set a string type field named
365+
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
366+
* the field, it drops corrupt records during parsing. A record with less/more tokens
367+
* than schema is not a corrupted record to CSV. When it meets a record having fewer
368+
* tokens than the length of the schema, sets `null` to extra fields. When the record
369+
* has more tokens than the length of the schema, it drops extra tokens.</li>
370370
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
371371
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
372372
* </ul>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
248248

249249
checkAnswer(
250250
sql("select nullstr, headers.Host from jsonTable"),
251-
Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, null))
251+
Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
252252
)
253253
}
254254

@@ -2563,4 +2563,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
25632563
assert(!files.exists(_.getName.endsWith("json")))
25642564
}
25652565
}
2566+
2567+
test("return partial result for bad records") {
2568+
val schema = "a double, b array<int>, c string, _corrupt_record string"
2569+
val badRecords = Seq(
2570+
"""{"a":"-","b":[0, 1, 2],"c":"abc"}""",
2571+
"""{"a":0.1,"b":{},"c":"def"}""").toDS()
2572+
val df = spark.read.schema(schema).json(badRecords)
2573+
2574+
checkAnswer(
2575+
df,
2576+
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
2577+
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
2578+
}
25662579
}

0 commit comments

Comments
 (0)