Skip to content

[SPARK-26108][SQL] Support custom lineSep in CSV datasource #23080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None):
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -453,6 +453,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.

>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
Expand All @@ -472,7 +475,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale)
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -868,7 +871,7 @@ def text(self, path, compression=None, lineSep=None):
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None):
charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None, lineSep=None):
r"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.

:param path: the path in any Hadoop supported file system
Expand Down Expand Up @@ -922,6 +925,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
the default UTF-8 charset will be used.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, ``""``.
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``. Maximum length is 1 character.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm missing something, but has this removed the ability use \r\n?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark never supported \r\n in writing path.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting this since I'd like to get rid of a local patch.

Why do you say it doesn't support this?

Reverting to the 2 character restriction works in my testing, on both the read and write paths and using arbitrary two character delimiters.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the extra comments: hadn't read deeply enough.

So the problem is Univocity's normalizedNewLine stuff? It fails in multiline cases? That's what I'm seeing in the tests and would explain why I don't see it in my use cases.

If that's the case, wondering if it's okay to allow two characters for the non-multiline cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a JIRA and go ahead if you can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack


>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
Expand All @@ -932,7 +937,7 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping,
encoding=encoding, emptyValue=emptyValue)
encoding=encoding, emptyValue=emptyValue, lineSep=lineSep)
self._jwrite.csv(path)

@since(1.5)
Expand Down
7 changes: 5 additions & 2 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None):
enforceSchema=None, emptyValue=None, locale=None, lineSep=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -675,6 +675,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.

>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
Expand All @@ -692,7 +695,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale)
emptyValue=emptyValue, locale=locale, lineSep=lineSep)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,20 @@ class CSVOptions(
*/
val emptyValueInWrite = emptyValue.getOrElse("\"\"")

/**
* A string between two consecutive JSON records.
*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
require(sep.length == 1, "'lineSep' can contain only 1 character.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I currently have a project where we are importing windows newlines CRLF from CSV files.

I backported these changes but ran into an issue with this check, because to properly parse Windows CSV files I must be able to set "\r\n" for lineSep in the settings.

It appears the reason this require was added is no longer needed as the code for asReaderSettings/asWriterSettings never calls that function anymore.

I was able to remove this assert and now able to import the windows newline CSV files into dataframes properly now.

Another issue I had before this was the very last column would always get a "\r" at the end of the column name, so something like "TEXT" would become "TEXT\r", and therefore we would be unable to query the TEXT column anymore. Setting lineSep to "\r\n" solved this issue as well.

Copy link
Member Author

@MaxGekk MaxGekk Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be able to set "\r\n" for lineSep in the settings.

You don't need to set \r\n to lineSep to split an input by lines because Hadoop Line Reader can detect \r\n itself. In which mode do you parse the CSV files - per-line multiLine = false or multiline?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am setting multiLine = "true".

The problem I am having with this is the column name of the last column in the CSV header gets a \r added to the end of it.

So if I have

name,age,text\r\nfred,30,"likes\r\npie,cookies,milk"\njill,30,"likes\ncake,cookies,milk"\r\n

I was getting schema with

StringType("NAME")
IntegerType("AGE")
StringType("TEXT\r")

Could it be the mixed use of \r\n and \n so it only wants to use \n for newlines?

Another issue is the configuration for lineSep is controlled upstream from a different configuration provided by users who have no knowledge of spark, but know how they formatted their CSV files, and without some re-architecture, it is not possible to detect that this setting is set to \r\n and then set it to None for the CSVOptions.

lineSeparator.foreach(format.setLineSeparator) already handles 1 to 2 characters so I figured this is a safe thing to support for lineSep configuration no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multiline true, we have fixed auto-multiline detect feature in CSV (see #22503) That will do the job.

Copy link

@thadeusb thadeusb Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is taken care of in this by the following line that I backported no?

settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)

I am still having the issue that univocity keeps a \r in the column name with multiline set to True and lineSeparatorInRead is unset.

The only way I seem to be able to get spark to not put a \r in the column name is to specifiy the lineSep option with two characters explicitly to \r\n. Then I get a normal set of column names and everything else parses correctly.

I'm wondering if this is just some really pedantic CSV file that I'm working with? Its a CSV that is exported upstream by python pandas.to_csv function with no extra arguments set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to file a JIRA after testing out against the master branch if the issue is persistent?

sep
}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes(charset)
}
val lineSeparatorInWrite: Option[String] = lineSeparator

def asWriterSettings: CsvWriterSettings = {
val writerSettings = new CsvWriterSettings()
val format = writerSettings.getFormat
Expand All @@ -200,6 +214,8 @@ class CSVOptions(
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
lineSeparatorInWrite.foreach(format.setLineSeparator)

writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
writerSettings.setNullValue(nullValue)
Expand All @@ -216,8 +232,10 @@ class CSVOptions(
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
lineSeparator.foreach(format.setLineSeparator)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)

settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)
settings.setReadInputOnSeparateThread(false)
Expand All @@ -227,7 +245,10 @@ class CSVOptions(
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setLineSeparatorDetectionEnabled(multiLine == true)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

settings.setNormalizeLineEndingsWithinQuotes(!multiLine)
}

settings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing. Maximum length is 1 character.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* whitespaces from values being written should be skipped.</li>
* <li>`ignoreTrailingWhiteSpace` (default `true`): a flag indicating defines whether or not
* trailing whitespaces from values being written should be skipped.</li>
* <li>`lineSep` (default `\n`): defines the line separator that should be used for writing.
* Maximum length is 1 character.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object TextInputCSVDataSource extends CSVDataSource {
headerChecker: CSVHeaderChecker,
requiredSchema: StructType): Iterator[InternalRow] = {
val lines = {
val linesReader = new HadoopFileLinesReader(file, conf)
val linesReader = new HadoopFileLinesReader(file, parser.options.lineSeparatorInRead, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => linesReader.close()))
linesReader.map { line =>
new String(line.getBytes, 0, line.getLength, parser.options.charset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing. Maximum length is 1 character.</li>
* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.csv

import java.io.File
import java.nio.charset.{Charset, UnsupportedCharsetException}
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
import java.nio.file.Files
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
Expand All @@ -33,7 +33,7 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.log4j.{AppenderSkeleton, LogManager}
import org.apache.log4j.spi.LoggingEvent

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1880,4 +1880,110 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils with Te
}
}
}

test("""Support line separator - default value \r, \r\n and \n""") {
val data = "\"a\",1\r\"c\",2\r\n\"d\",3\n"

withTempPath { path =>
Files.write(path.toPath, data.getBytes(StandardCharsets.UTF_8))
val df = spark.read.option("inferSchema", true).csv(path.getAbsolutePath)
val expectedSchema =
StructType(StructField("_c0", StringType) :: StructField("_c1", IntegerType) :: Nil)
checkAnswer(df, Seq(("a", 1), ("c", 2), ("d", 3)).toDF())
assert(df.schema === expectedSchema)
}
}

def testLineSeparator(lineSep: String, encoding: String, inferSchema: Boolean, id: Int): Unit = {
test(s"Support line separator in ${encoding} #${id}") {
// Read
val data =
s""""a",1$lineSep
|c,2$lineSep"
|d",3""".stripMargin
val dataWithTrailingLineSep = s"$data$lineSep"

Seq(data, dataWithTrailingLineSep).foreach { lines =>
withTempPath { path =>
Files.write(path.toPath, lines.getBytes(encoding))
val schema = StructType(StructField("_c0", StringType)
:: StructField("_c1", LongType) :: Nil)

val expected = Seq(("a", 1), ("\nc", 2), ("\nd", 3))
.toDF("_c0", "_c1")
Seq(false, true).foreach { multiLine =>
val reader = spark
.read
.option("lineSep", lineSep)
.option("multiLine", multiLine)
.option("encoding", encoding)
val df = if (inferSchema) {
reader.option("inferSchema", true).csv(path.getAbsolutePath)
} else {
reader.schema(schema).csv(path.getAbsolutePath)
}
checkAnswer(df, expected)
}
}
}

// Write
withTempPath { path =>
Seq("a", "b", "c").toDF("value").coalesce(1)
.write
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
val partFile = TestUtils.recursiveList(path).filter(f => f.getName.startsWith("part-")).head
val readBack = new String(Files.readAllBytes(partFile.toPath), encoding)
assert(
readBack === s"a${lineSep}b${lineSep}c${lineSep}")
}

// Roundtrip
withTempPath { path =>
val df = Seq("a", "b", "c").toDF()
df.write
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
val readBack = spark
.read
.option("lineSep", lineSep)
.option("encoding", encoding)
.csv(path.getAbsolutePath)
checkAnswer(df, readBack)
}
}
}

// scalastyle:off nonascii
List(
(0, "|", "UTF-8", false),
(1, "^", "UTF-16BE", true),
(2, ":", "ISO-8859-1", true),
(3, "!", "UTF-32LE", false),
(4, 0x1E.toChar.toString, "UTF-8", true),
(5, "아", "UTF-32BE", false),
(6, "у", "CP1251", true),
(8, "\r", "UTF-16LE", true),
(9, "\u000d", "UTF-32BE", false),
(10, "=", "US-ASCII", false),
(11, "$", "utf-32le", true)
).foreach { case (testNum, sep, encoding, inferSchema) =>
testLineSeparator(sep, encoding, inferSchema, testNum)
}
// scalastyle:on nonascii

test("lineSep restrictions") {
val errMsg1 = intercept[IllegalArgumentException] {
spark.read.option("lineSep", "").csv(testFile(carsFile)).collect
}.getMessage
assert(errMsg1.contains("'lineSep' cannot be an empty string"))

val errMsg2 = intercept[IllegalArgumentException] {
spark.read.option("lineSep", "123").csv(testFile(carsFile)).collect
}.getMessage
assert(errMsg2.contains("'lineSep' can contain only 1 character"))
}
}