Skip to content

[SPARK-26151][SQL] Return partial results for bad CSV records #23120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,21 +243,24 @@ class UnivocityParser(
() => getPartialResult(),
new RuntimeException("Malformed CSV record"))
} else {
try {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to convert the tokens that correspond to the required columns.
var i = 0
while (i < requiredSchema.length) {
// When the length of the returned tokens is identical to the length of the parsed schema,
// we just need to convert the tokens that correspond to the required columns.
var badRecordException: Option[Throwable] = None
var i = 0
while (i < requiredSchema.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we stop parsing when we hit the first exception?

Copy link
Member Author

@MaxGekk MaxGekk Nov 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we will lose field values that could be converted successfully after the exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's doable for CSV, as the tokens are separated ahead, and we can keep parsing after an exception. Is it also doable for other text based data sources?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on what kind of error we face to. If a parser is still in normal state and ready to continue, we could skip current error. In case of JSON, we parse input in stream fashion, and convert values to desired type on the fly. If JacksonParser is able to recognize next token, why we should stop on the first error?

try {
row(i) = valueConverters(i).apply(getToken(tokens, i))
i += 1
} catch {
case NonFatal(e) =>
badRecordException = badRecordException.orElse(Some(e))
}
i += 1
}

if (badRecordException.isEmpty) {
row
} catch {
case NonFatal(e) =>
// For corrupted records with the number of tokens same as the schema,
// CSV reader doesn't support partial results. All fields other than the field
// configured by `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => getCurrentInput, () => None, e)
} else {
throw BadRecordException(() => getCurrentInput, () => Some(row), badRecordException.get)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,21 @@ class FailureSafeParser[IN](
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord)
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord))
private val resultRow = new GenericInternalRow(schema.length)
private val nullResult = new GenericInternalRow(schema.length)

// This function takes 2 parameters: an optional partial result, and the bad record. If the given
// schema doesn't contain a field for corrupted record, we just return the partial result or a
// row with all fields null. If the given schema contains a field for corrupted record, we will
// set the bad record to this field, and set other fields according to the partial result or null.
private val toResultRow: (Option[InternalRow], () => UTF8String) => InternalRow = {
if (corruptFieldIndex.isDefined) {
(row, badRecord) => {
var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
i += 1
}
resultRow(corruptFieldIndex.get) = badRecord()
resultRow
(row, badRecord) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this change in FailureSafeParser, does JSON support returning partial result?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now JSON does not support this. Need additional changes in JacksonParser to return partial results.

var i = 0
while (i < actualSchema.length) {
val from = actualSchema(i)
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
i += 1
}
} else {
(row, _) => row.getOrElse(nullResult)
corruptFieldIndex.foreach(index => resultRow(index) = badRecord())
resultRow
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CsvFunctionsSuite extends QueryTest with SharedSQLContext {
"mode" -> "Permissive", "columnNameOfCorruptRecord" -> columnNameOfCorruptRecord)))

checkAnswer(df2, Seq(
Row(Row(null, null, "0,2013-111-11 12:13:14")),
Row(Row(0, null, "0,2013-111-11 12:13:14")),
Row(Row(1, java.sql.Date.valueOf("1983-08-04"), null))))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schema)
.csv(testFile(valueMalformedFile))
checkAnswer(df1,
Row(null, null) ::
Row(0, null) ::
Row(1, java.sql.Date.valueOf("1983-08-04")) ::
Nil)

Expand All @@ -1131,7 +1131,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schemaWithCorrField1)
.csv(testFile(valueMalformedFile))
checkAnswer(df2,
Row(null, null, "0,2013-111-11 12:13:14") ::
Row(0, null, "0,2013-111-11 12:13:14") ::
Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
Nil)

Expand All @@ -1148,7 +1148,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
.schema(schemaWithCorrField2)
.csv(testFile(valueMalformedFile))
checkAnswer(df3,
Row(null, "0,2013-111-11 12:13:14", null) ::
Row(0, "0,2013-111-11 12:13:14", null) ::
Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
Nil)

Expand Down