Skip to content

Commit 433ae90

Browse files
LuciferYangHyukjinKwon
authored andcommitted
[SPARK-33566][CORE][SQL][SS][PYTHON] Make unescapedQuoteHandling option configurable when read CSV
### What changes were proposed in this pull request? There are some differences between Spark CSV, opencsv and commons-csv, the typical case are described in SPARK-33566, When there are both unescaped quotes and unescaped qualifier in value, the results of parsing are different. The reason for the difference is Spark use `STOP_AT_DELIMITER` as default `UnescapedQuoteHandling` to build `CsvParser` and it not configurable. On the other hand, opencsv and commons-csv use the parsing mechanism similar to `STOP_AT_CLOSING_QUOTE ` by default. So this pr make `unescapedQuoteHandling` option configurable to get the same parsing result as opencsv and commons-csv. ### Why are the changes needed? Make unescapedQuoteHandling option configurable when read CSV to make parsing more flexible。 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action - Add a new case similar to that described in SPARK-33566 Closes #30518 from LuciferYang/SPARK-33566. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent d082ad0 commit 433ae90

File tree

8 files changed

+122
-5
lines changed

8 files changed

+122
-5
lines changed

python/pyspark/sql/readwriter.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
522522
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
523523
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
524524
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
525-
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None):
525+
pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None,
526+
unescapedQuoteHandling=None):
526527
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.
527528
528529
This function will go through the input once to determine the input schema if
@@ -685,6 +686,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
685686
modifiedAfter (batch only) : an optional timestamp to only include files with
686687
modification times occurring after the specified time. The provided timestamp
687688
must be in the following format: YYYY-MM-DDTHH:mm:ss (e.g. 2020-06-01T13:00:00)
689+
unescapedQuoteHandling : str, optional
690+
defines how the CsvParser will handle values with unescaped quotes. If None is
691+
set, it uses the default value, ``STOP_AT_DELIMITER``.
692+
693+
* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
694+
the quote character and proceed parsing the value as a quoted value, until a closing
695+
quote is found.
696+
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
697+
as an unquoted value. This will make the parser accumulate all characters of the current
698+
parsed value until the delimiter is found. If no delimiter is found in the value, the
699+
parser will continue accumulating characters from the input until a delimiter or line
700+
ending is found.
701+
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
702+
as an unquoted value. This will make the parser accumulate all characters until the
703+
delimiter or a line ending is found in the input.
704+
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
705+
for the given value will be skipped and the value set in nullValue will be produced
706+
instead.
707+
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
708+
will be thrown.
688709
689710
Examples
690711
--------
@@ -708,7 +729,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
708729
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
709730
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
710731
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
711-
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter)
732+
modifiedBefore=modifiedBefore, modifiedAfter=modifiedAfter,
733+
unescapedQuoteHandling=unescapedQuoteHandling)
712734
if isinstance(path, str):
713735
path = [path]
714736
if type(path) == list:

python/pyspark/sql/readwriter.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class DataFrameReader(OptionUtils):
113113
lineSep: Optional[str] = ...,
114114
pathGlobFilter: Optional[Union[bool, str]] = ...,
115115
recursiveFileLookup: Optional[Union[bool, str]] = ...,
116+
unescapedQuoteHandling: Optional[str] = ...,
116117
) -> DataFrame: ...
117118
def orc(
118119
self,

python/pyspark/sql/streaming.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
761761
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
762762
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
763763
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
764-
pathGlobFilter=None, recursiveFileLookup=None):
764+
pathGlobFilter=None, recursiveFileLookup=None, unescapedQuoteHandling=None):
765765
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
766766
767767
This function will go through the input once to determine the input schema if
@@ -900,6 +900,26 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
900900
recursiveFileLookup : str or bool, optional
901901
recursively scan a directory for files. Using this option disables
902902
`partition discovery <https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery>`_. # noqa
903+
unescapedQuoteHandling : str, optional
904+
defines how the CsvParser will handle values with unescaped quotes. If None is
905+
set, it uses the default value, ``STOP_AT_DELIMITER``.
906+
907+
* ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate
908+
the quote character and proceed parsing the value as a quoted value, until a closing
909+
quote is found.
910+
* ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value
911+
as an unquoted value. This will make the parser accumulate all characters of the current
912+
parsed value until the delimiter is found. If no delimiter is found in the value, the
913+
parser will continue accumulating characters from the input until a delimiter or line
914+
ending is found.
915+
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, consider the value
916+
as an unquoted value. This will make the parser accumulate all characters until the
917+
delimiter or a line ending is found in the input.
918+
* ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed
919+
for the given value will be skipped and the value set in nullValue will be produced
920+
instead.
921+
* ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException
922+
will be thrown.
903923
904924
.. versionadded:: 2.0.0
905925
@@ -926,7 +946,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
926946
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
927947
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
928948
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
929-
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
949+
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup,
950+
unescapedQuoteHandling=unescapedQuoteHandling)
930951
if isinstance(path, str):
931952
return self._df(self._jreader.csv(path))
932953
else:

python/pyspark/sql/streaming.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ class DataStreamReader(OptionUtils):
149149
lineSep: Optional[str] = ...,
150150
pathGlobFilter: Optional[Union[bool, str]] = ...,
151151
recursiveFileLookup: Optional[Union[bool, str]] = ...,
152+
unescapedQuoteHandling: Optional[str] = ...,
152153
) -> DataFrame: ...
153154

154155
class DataStreamWriter:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,12 @@ class CSVOptions(
213213
}
214214
val lineSeparatorInWrite: Option[String] = lineSeparator
215215

216+
/**
217+
* The handling method to be used when unescaped quotes are found in the input.
218+
*/
219+
val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters
220+
.getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT))
221+
216222
def asWriterSettings: CsvWriterSettings = {
217223
val writerSettings = new CsvWriterSettings()
218224
val format = writerSettings.getFormat
@@ -258,7 +264,7 @@ class CSVOptions(
258264
settings.setNullValue(nullValue)
259265
settings.setEmptyValue(emptyValueInRead)
260266
settings.setMaxCharsPerColumn(maxCharsPerColumn)
261-
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
267+
settings.setUnescapedQuoteHandling(unescapedQuoteHandling)
262268
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
263269
lineSeparatorInRead.foreach { _ =>
264270
settings.setNormalizeLineEndingsWithinQuotes(!multiLine)

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
727727
* a record can have.</li>
728728
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
729729
* for any given value being read. By default, it is -1 meaning unlimited length</li>
730+
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
731+
* will handle values with unescaped quotes.
732+
* <ul>
733+
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
734+
* the quote character and proceed parsing the value as a quoted value, until a closing
735+
* quote is found.</li>
736+
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
737+
* as an unquoted value. This will make the parser accumulate all characters of the current
738+
* parsed value until the delimiter is found. If no
739+
* delimiter is found in the value, the parser will continue accumulating characters from
740+
* the input until a delimiter or line ending is found.</li>
741+
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
742+
* as an unquoted value. This will make the parser accumulate all characters until the
743+
* delimiter or a line ending is found in the input.</li>
744+
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
745+
* for the given value will be skipped and the value set in nullValue will be produced
746+
* instead.</li>
747+
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
748+
* will be thrown.</li>
749+
* </ul>
750+
* </li>
730751
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
731752
* during parsing. It supports the following case-insensitive modes. Note that Spark tries
732753
* to parse only required columns in CSV under column pruning. Therefore, corrupt records

sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
396396
* a record can have.</li>
397397
* <li>`maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed
398398
* for any given value being read. By default, it is -1 meaning unlimited length</li>
399+
* <li>`unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser
400+
* will handle values with unescaped quotes.
401+
* <ul>
402+
* <li>`STOP_AT_CLOSING_QUOTE`: If unescaped quotes are found in the input, accumulate
403+
* the quote character and proceed parsing the value as a quoted value, until a closing
404+
* quote is found.</li>
405+
* <li>`BACK_TO_DELIMITER`: If unescaped quotes are found in the input, consider the value
406+
* as an unquoted value. This will make the parser accumulate all characters of the current
407+
* parsed value until the delimiter is found. If no delimiter is found in the value, the
408+
* parser will continue accumulating characters from the input until a delimiter or line
409+
* ending is found.</li>
410+
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value
411+
* as an unquoted value. This will make the parser accumulate all characters until the
412+
* delimiter or a line ending is found in the input.</li>
413+
* <li>`STOP_AT_DELIMITER`: If unescaped quotes are found in the input, the content parsed
414+
* for the given value will be skipped and the value set in nullValue will be produced
415+
* instead.</li>
416+
* <li>`RAISE_ERROR`: If unescaped quotes are found in the input, a TextParsingException
417+
* will be thrown.</li>
418+
* </ul>
419+
* </li>
399420
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
400421
* during parsing. It supports the following case-insensitive modes.
401422
* <ul>

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2428,6 +2428,30 @@ abstract class CSVSuite
24282428
assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2")))
24292429
}
24302430
}
2431+
2432+
test("SPARK-33566: configure UnescapedQuoteHandling to parse " +
2433+
"unescaped quotes and unescaped delimiter data correctly") {
2434+
withTempPath { path =>
2435+
val dataPath = path.getCanonicalPath
2436+
val row1 = Row("""a,""b,c""", "xyz")
2437+
val row2 = Row("""a,b,c""", """x""yz""")
2438+
// Generate the test data, use `,` as delimiter and `"` as quotes, but they didn't escape.
2439+
Seq(
2440+
"""c1,c2""",
2441+
s""""${row1.getString(0)}","${row1.getString(1)}"""",
2442+
s""""${row2.getString(0)}","${row2.getString(1)}"""")
2443+
.toDF().repartition(1).write.text(dataPath)
2444+
// Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE,
2445+
// the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""")
2446+
val result = spark.read
2447+
.option("inferSchema", "true")
2448+
.option("header", "true")
2449+
.option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE")
2450+
.csv(dataPath).collect()
2451+
val exceptResults = Array(row1, row2)
2452+
assert(result.sameElements(exceptResults))
2453+
}
2454+
}
24312455
}
24322456

24332457
class CSVv1Suite extends CSVSuite {

0 commit comments

Comments
 (0)