Skip to content

[SPARK-26303][SQL] Return partial results for bad JSON records #23253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ displayTitle: Spark SQL Upgrading Guide

- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.

- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.
- In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the returned row can contain non-`null` fields if some of CSV column values were parsed and converted to desired types successfully.

- In Spark version 2.4 and earlier, JSON datasource and JSON functions like `from_json` convert a bad JSON record to a row with all `null`s in the PERMISSIVE mode when specified schema is `StructType`. Since Spark 3.0, the returned row can contain non-`null` fields if some of JSON column values were parsed and converted to desired types successfully.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does from_csv support it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_csv was added recently. It didn't exist in 2.4


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, if returned row contains non null fields, how do we know if the row is read from a bad JSON record or a correct JSON record?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this behavior is also defined at some places like DataFrameReader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the row was produced from a bad JSON record, the bad record is placed to the corrupt column otherwise the corrupt column contains null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no corrupt column?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PERMISSIVE mode, no way but at the moment (without the PR) you cannot distinguish a row produced from a bad record from a row produced from JSON object with all null fields too.

A row itself with all null cannot be an indicator of bad record. Need an additional flag. null or non-null in the corrupt column plays such role.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For such behavior change, shall we add a config to roll back to previous behavior?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And you should also update other places where defines previous behavior, like DataFrameReader.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan This PR propose similar changes as in #23120 . Could you take a look at it.

For such behavior change, shall we add a config to roll back to previous behavior?

I don't think it makes sense to introduce global SQL config for this particular case. The risk of breaking users apps is low because apps logic cannot based only on presence of all nulls in row. All nulls don't differentiate bad and not-bad JSON records. From my point of view, a note in the migration guide is enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Sounds reasonable.

## Upgrading From Spark SQL 2.3 to 2.4

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
Expand Down Expand Up @@ -424,7 +424,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
Expand Down Expand Up @@ -648,7 +648,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
into a field configured by ``columnNameOfCorruptRecord``, and sets malformed \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.nio.charset.MalformedInputException

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
import scala.util.control.NonFatal

import com.fasterxml.jackson.core._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -347,17 +347,28 @@ class JacksonParser(
schema: StructType,
fieldConverters: Array[ValueConverter]): InternalRow = {
val row = new GenericInternalRow(schema.length)
var badRecordException: Option[Throwable] = None

while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
row.update(index, fieldConverters(index).apply(parser))

try {
row.update(index, fieldConverters(index).apply(parser))
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
case None =>
parser.skipChildren()
}
}

row
if (badRecordException.isEmpty) {
row
} else {
throw PartialResultException(row, badRecordException.get)
}
}

/**
Expand Down Expand Up @@ -428,6 +439,11 @@ class JacksonParser(
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
case PartialResultException(row, cause) =>
throw BadRecordException(
record = () => recordLiteral(record),
partialResult = () => Some(row),
cause)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ package org.apache.spark.sql.catalyst.util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.unsafe.types.UTF8String

/**
* Exception thrown when the underlying parser returns a partial result of parsing.
* @param partialResult the partial result of parsing a bad record.
* @param cause the actual exception about why the parser cannot return full result.
*/
case class PartialResultException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, is this intentional? It looks like javax.naming.PartialResultException to me. Not only the same name, but also the semantics.

@cloud-fan . Is this okay? Or, shall we use more distinguishable name like SparkPartialResultException instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to carry the InternalRow here. I'm fine with the current name, as we don't have Spark prefix in BadRecordException

Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it~ Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. but let's just rename it if possible .. the cost of renaming is 0 but there are some benefits by that ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, we don't have to standardise the name but let's use another name that doesn't conflict with Java's libraries.

partialResult: InternalRow,
cause: Throwable)
extends Exception(cause)

/**
* Exception thrown when the underlying parser meet a bad record and can't parse it.
* @param record a function to return the record that cause the parser to fail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* during parsing.
* <ul>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
* keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
Expand Down Expand Up @@ -598,13 +598,13 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. A record with less/more tokens than schema is not a corrupted record to
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
* `null` to extra fields. When the record has more tokens than the length of the schema,
* it drops extra tokens.</li>
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
* To keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
* the field, it drops corrupt records during parsing. A record with less/more tokens
* than schema is not a corrupted record to CSV. When it meets a record having fewer
* tokens than the length of the schema, sets `null` to extra fields. When the record
* has more tokens than the length of the schema, it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* during parsing.
* <ul>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`. To
* keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
Expand Down Expand Up @@ -360,13 +360,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. A record with less/more tokens than schema is not a corrupted record to
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
* `null` to extra fields. When the record has more tokens than the length of the schema,
* it drops extra tokens.</li>
* field configured by `columnNameOfCorruptRecord`, and sets malformed fields to `null`.
* To keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have
* the field, it drops corrupt records during parsing. A record with less/more tokens
* than schema is not a corrupted record to CSV. When it meets a record having fewer
* tokens than the length of the schema, sets `null` to extra fields. When the record
* has more tokens than the length of the schema, it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {

checkAnswer(
sql("select nullstr, headers.Host from jsonTable"),
Seq(Row("", "1.abc.com"), Row("", null), Row(null, null), Row(null, null))
Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
)
}

Expand Down Expand Up @@ -2563,4 +2563,17 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(!files.exists(_.getName.endsWith("json")))
}
}

test("return partial result for bad records") {
val schema = "a double, b array<int>, c string, _corrupt_record string"
val badRecords = Seq(
"""{"a":"-","b":[0, 1, 2],"c":"abc"}""",
"""{"a":0.1,"b":{},"c":"def"}""").toDS()
val df = spark.read.schema(schema).json(badRecords)

checkAnswer(
df,
Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 2],"c":"abc"}""") ::
Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
}
}