-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26745][SQL] Skip empty lines in JSON-derived DataFrames when skipParsing optimization in effect #23665
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
446ae98
1544771
236227f
e4d9052
105e5bb
e8e3189
13942b8
5f173d9
7a51764
91305ee
051d84a
57d2c05
4fffe7f
2252045
532a83d
3cae4da
cd2f30c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -28,7 +28,8 @@ class FailureSafeParser[IN]( | |||
mode: ParseMode, | ||||
schema: StructType, | ||||
columnNameOfCorruptRecord: String, | ||||
isMultiLine: Boolean) { | ||||
isMultiLine: Boolean, | ||||
unparsedRecordIsNonEmpty: IN => Boolean = (_: IN) => true) { | ||||
|
||||
private val corruptFieldIndex = schema.getFieldIndex(columnNameOfCorruptRecord) | ||||
private val actualSchema = StructType(schema.filterNot(_.name == columnNameOfCorruptRecord)) | ||||
|
@@ -55,11 +56,15 @@ class FailureSafeParser[IN]( | |||
|
||||
def parse(input: IN): Iterator[InternalRow] = { | ||||
try { | ||||
if (skipParsing) { | ||||
Iterator.single(InternalRow.empty) | ||||
} else { | ||||
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) | ||||
} | ||||
if (skipParsing) { | ||||
if (unparsedRecordIsNonEmpty(input)) { | ||||
Iterator.single(InternalRow.empty) | ||||
} else { | ||||
Iterator.empty | ||||
} | ||||
} else { | ||||
rawParser.apply(input).toIterator.map(row => toResultRow(Some(row), () => null)) | ||||
} | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to modify this file? Since this is a issue in json stuffs, I think its better to handle this case in the json parser side. Can't we do handle this in the same way with CSV one?, e.g., spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala Line 333 in 3a17c6a
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One of my earlier revisions worked in the way you suggest (if I've understood your point correctly); I changed it in order to avoid redundant empty-line filtering in the case where the full parser has to run anyway (i.e. where What do you think? Is that a valid optimization, or is it better to do it on the JSON side as in 13942b8 to avoid changes to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought this: maropu@f4df907 |
||||
} catch { | ||||
case e: BadRecordException => mode match { | ||||
case PermissiveMode => | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ "a" : 1 , "b" : 2 , "c" : 3 } | ||
|
||
{ "a" : 4 , "b" : 5 , "c" : 6 } | ||
|
||
{ "a" : 7 , "b" : 8 , "c" : 9 } | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2426,6 +2426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { | |
countForMalformedJSON(0, Seq("")) | ||
} | ||
|
||
test("count() for non-multiline input with empty lines") { | ||
val withEmptyLineData = Array(Map("a" -> 1, "b" -> 2, "c" -> 3), | ||
Map("a" -> 4, "b" -> 5, "c" -> 6), | ||
Map("a" -> 7, "b" -> 8, "c" -> 9)) | ||
val df = spark.read.json("src/test/resources/test-data/with-empty-line.json") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz use |
||
// important to do this .count() first, prior to caching/persisting/computing/collecting, to | ||
// test the non-parsed-count pathway | ||
assert(df.count() === withEmptyLineData.length, | ||
"JSON DataFrame unparsed-count should exclude whitespace-only lines") | ||
// cache and collect to check that count stays stable under those operations | ||
df.cache() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we dont need this cache. |
||
assert(df.count() === withEmptyLineData.length, | ||
"JSON DataFrame parsed-count should exclude whitespace-only lines") | ||
val collected = df.collect().map(_.getValuesMap(Seq("a", "b", "c"))) | ||
assert(collected === withEmptyLineData) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz check |
||
} | ||
|
||
test("SPARK-25040: empty strings should be disallowed") { | ||
def failedOnEmptyString(dataType: DataType): Unit = { | ||
val df = spark.read.schema(s"a ${dataType.catalogString}") | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rather revert #21909. I think #21909 was a bandaid fix and this is another bandaid fix for that.
JacksonParser
itself can produce no record or multiple records. Previous code path assumed that it always produce a single record, and the current fix it checked the input again outside ofJacksonParser
.There is another problem from #21909 . It also looks going to produce incorrect counts when the input json is an array:
Current master:
Spark 2.3.1:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @cloud-fan and @gatorsmile, if you don't mind, I would like to revert #21909. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's a good catch! I think the idea of count optimization still makes sense, but our parser is so flexible and there are many corner cases we need to think of.
cc @MaxGekk , how hard do you think it is to fix it? If it's too hard, +1 to revert it. I think a safer approach maybe, only enable this count optimization under some certain cases that are 100% safe. (whitelist).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, that's safer approach if possible but to do that, we should manually check the input like the current PR. It doesn't looks a good idea to me to check input outside of
JacksonParser
.The problem is, we cannot distinguish the cases below without parsing:
One line (
input: IN
) can be, 0 record, 1 record and multiple records.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case when an user sets
StructType
for arrays, can be excluded from the count optimization in advance.Regarding empty (blank) string, before #23543 they are considered as bad records (appear in results). And
count()
produced pretty consistent results.As far as you know, in the case of
count
we have emptyStructType
as the required schema. It means we don't have to fully parse the input and convert all field to desired types. It meanscount()
can "count" bad records. And we cannot compare number of rows inshow()
output tocount()
. There are always the case when the number of rows can be different. I think we should answer to more generic question - which input conform to emptyStructType()
. After that it should be clear what kind of optimization can be applied for count(). Possible answers:linesReader.filter(_.getLength > 0)
FailureSafeParser
doesn't produce bad records (inPERMISSIVE
mode).{
and closed}
. And anything in between.Till we answer to the above question, reverting of the #21909 just move us from one "bad" behavior to one another "bad" behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the key here is, one line can produce 0 or 1 or more records, how to speed it up when we only care about counts? It looks to me that we can enable the count optimization only for
{...}
, and fallback to parsing for other cases. @MaxGekk do you think this is applicable? If it's a simple fix, let's do it for branch 2.4 as well, otherwise +1 for reverting it from 2.4 and re-do it at master.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Outside of datasources,
count()
has very well defined semantic - number of rows matched to required schema. In the case ofcount()
, the required schema isStruct(Array())
. From user's view,count()
doesn't require any field presented in counted row.If you would like to see the same number of rows in
show()
output and whatcount()
returns, you need to push full datasource schema as required schema otherwise there are always malformed input on which you will see different results.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think for permissive mode, the results(at least the counts) are always same even if some input are malformed? Otherwise, it seems like users only want to count the number of lines, and they should read the json files as text and do count.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: #23665 (comment)
This is reasoning about the count. (see below, continues)
To be 100% about the correct results, we should always parse everything although we're doing the current way for optimization and it started to have some inconsistent results.
Yes, so we don't convert 100%. In that case, we should at least parse
StructType()
which I guess empty object{...}
. I think that's what JSON did before the pointed PRs above.Yes, I agree. It shouldn't behaves like text source + count(). Let's revert anyway. I don't think this behaviour is ideal anyway.
For other behaviours, I was thinking about making a
README.md
that whitelists behaviours for both CSV and JSON for Spark developers under somewhere related JSON and CSV directory. It's a bit grunting job but sounds like it should be done. I could do this together @MaxGekk since he has worked on this area a lot as well.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can but we should add a if-else for each input + checking some characters since it targets to avoid to parse.