Skip to content

[SPARK-26108][SQL] Support custom lineSep in CSV datasource #23080

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 17 commits into from

Conversation

MaxGekk
Copy link
Member

@MaxGekk MaxGekk commented Nov 18, 2018

What changes were proposed in this pull request?

In the PR, I propose new options for CSV datasource - lineSep similar to Text and JSON datasource. The option allows to specify custom line separator of maximum length of 2 characters (because of a restriction in uniVocity parser). New option can be used in reading and writing CSV files.

How was this patch tested?

Added a few tests with custom lineSep for enabled/disabled multiLine in read as well as tests in write. Also I added roundtrip tests.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 18, 2018

@HyukjinKwon Could you look at the PR, please.

@SparkQA
Copy link

SparkQA commented Nov 18, 2018

Test build #98979 has finished for PR 23080 at commit 12022ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 18, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Nov 18, 2018

Test build #98980 has finished for PR 23080 at commit 12022ad.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 18, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Nov 18, 2018

Test build #98982 has finished for PR 23080 at commit 12022ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
require(sep.length <= 2, "'lineSep' can contain 1 or 2 characters.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, might not be a super big deal but I believe this should be counted after converting it into UTF-8.

Copy link
Member

@HyukjinKwon HyukjinKwon Nov 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could say the line separator should be 1 or 2 bytes (UTF-8) in read path specifically when multiline is enabled.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I see.

}

val lineSeparatorInRead: Option[Array[Byte]] = lineSeparator.map { lineSep =>
lineSep.getBytes("UTF-8")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk, CSV's multiline does not support encoding but I think normal mode supports encoding. It should be okay to get bytes from it. We can just throw an exception when multiline is enabled.

@HyukjinKwon
Copy link
Member

Ah, also, CsvParser.beginParsing takes an additional argument Charset. It should rather be easily able to support encoding in multiLine. @MaxGekk, would you be able to find some time to work on it? If that change can make the current PR easier. we can merge that one first.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 19, 2018

would you be able to find some time to work on it? If that change can make the current PR easier. we can merge that one first.

I will try

@HyukjinKwon
Copy link
Member

@MaxGekk, let's rebase this one accordingly with encoding support.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99105 has finished for PR 23080 at commit bb8a13b.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member Author

MaxGekk commented Nov 21, 2018

jenkins, retest this, please

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99117 has finished for PR 23080 at commit 1f5399f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2018

Test build #99122 has finished for PR 23080 at commit 1f5399f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -216,8 +232,13 @@ class CSVOptions(
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
lineSeparator.foreach {sep =>
format.setLineSeparator(sep)
format.setNormalizedNewline(0x00.toChar)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have some problems here for setting newlines more then 1 character because setNormalizedNewline only supports one character.

This is related with #18581 (comment) and uniVocity/univocity-parsers#170

That's why I thought we can only support this for single character for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I thought we can only support this for single character for now.

ok. I will restrict line separators by one character.

@@ -227,7 +248,10 @@ class CSVOptions(
settings.setEmptyValue(emptyValueInRead)
settings.setMaxCharsPerColumn(maxCharsPerColumn)
settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER)
settings.setLineSeparatorDetectionEnabled(multiLine == true)
settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)
lineSeparatorInRead.foreach { _ =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

@HyukjinKwon
Copy link
Member

@SparkQA
Copy link

SparkQA commented Nov 22, 2018

Test build #99181 has finished for PR 23080 at commit 918d163.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -377,6 +377,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
* <li>`locale` (default is `en-US`): sets a locale as language tag in IETF BCP 47 format.
* For instance, this is used while parsing dates and timestamps.</li>
* <li>`lineSep` (default covers all `\r`, `\r\n` and `\n`): defines the line separator
* that should be used for parsing. Maximum length is 2.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry. can you fix Maximum length is 2 as well? should be good to go.

@SparkQA
Copy link

SparkQA commented Nov 23, 2018

Test build #99210 has finished for PR 23080 at commit a4c4b67.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

Last changes were only doc changes. Let me get this in.

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

@MaxGekk, thanks for working on this one.

@asfgit asfgit closed this in 8e8d117 Nov 23, 2018
@SparkQA
Copy link

SparkQA commented Nov 23, 2018

Test build #99215 has finished for PR 23080 at commit a4c4b67.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pooja-murarka
Copy link

I am testing lineSep with spark 2.4

data.csv : "a",1 "c",2 "d",3
val schema : StructType =
StructType(
Seq(
StructField(name = "dteday", dataType = StringType),
StructField(name = "hr", dataType = IntegerType)
)
val logData = spark.read.format("csv").schema(schema).option("lineSep", "\t").load("data.csv")
But can only see schema without any data.
scala> logData.show()
+------+----+
|dteday| hr|
+------+----+
| null|null|
+------+----+

Can you please suggest if i missed something or above fix has not been merged with branch.

@HyukjinKwon
Copy link
Member

It's fixed in upcoming Spark. Spark 2.4 does not support it.

jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
## What changes were proposed in this pull request?

In the PR,  I propose new options for CSV datasource - `lineSep` similar to Text and JSON datasource. The option allows to specify custom line separator of maximum length of 2 characters (because of a restriction in `uniVocity` parser). New option can be used in reading and writing CSV files.

## How was this patch tested?

Added a few tests with custom `lineSep` for enabled/disabled `multiLine` in read as well as tests in write. Also I added roundtrip tests.

Closes apache#23080 from MaxGekk/csv-line-sep.

Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
*/
val lineSeparator: Option[String] = parameters.get("lineSep").map { sep =>
require(sep.nonEmpty, "'lineSep' cannot be an empty string.")
require(sep.length == 1, "'lineSep' can contain only 1 character.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I currently have a project where we are importing windows newlines CRLF from CSV files.

I backported these changes but ran into an issue with this check, because to properly parse Windows CSV files I must be able to set "\r\n" for lineSep in the settings.

It appears the reason this require was added is no longer needed as the code for asReaderSettings/asWriterSettings never calls that function anymore.

I was able to remove this assert and now able to import the windows newline CSV files into dataframes properly now.

Another issue I had before this was the very last column would always get a "\r" at the end of the column name, so something like "TEXT" would become "TEXT\r", and therefore we would be unable to query the TEXT column anymore. Setting lineSep to "\r\n" solved this issue as well.

Copy link
Member Author

@MaxGekk MaxGekk Apr 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be able to set "\r\n" for lineSep in the settings.

You don't need to set \r\n to lineSep to split an input by lines because Hadoop Line Reader can detect \r\n itself. In which mode do you parse the CSV files - per-line multiLine = false or multiline?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am setting multiLine = "true".

The problem I am having with this is the column name of the last column in the CSV header gets a \r added to the end of it.

So if I have

name,age,text\r\nfred,30,"likes\r\npie,cookies,milk"\njill,30,"likes\ncake,cookies,milk"\r\n

I was getting schema with

StringType("NAME")
IntegerType("AGE")
StringType("TEXT\r")

Could it be the mixed use of \r\n and \n so it only wants to use \n for newlines?

Another issue is the configuration for lineSep is controlled upstream from a different configuration provided by users who have no knowledge of spark, but know how they formatted their CSV files, and without some re-architecture, it is not possible to detect that this setting is set to \r\n and then set it to None for the CSVOptions.

lineSeparator.foreach(format.setLineSeparator) already handles 1 to 2 characters so I figured this is a safe thing to support for lineSep configuration no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For multiline true, we have fixed auto-multiline detect feature in CSV (see #22503) That will do the job.

Copy link

@thadeusb thadeusb Apr 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is taken care of in this by the following line that I backported no?

settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine)

I am still having the issue that univocity keeps a \r in the column name with multiline set to True and lineSeparatorInRead is unset.

The only way I seem to be able to get spark to not put a \r in the column name is to specifiy the lineSep option with two characters explicitly to \r\n. Then I get a normal set of column names and everything else parses correctly.

I'm wondering if this is just some really pedantic CSV file that I'm working with? Its a CSV that is exported upstream by python pandas.to_csv function with no extra arguments set.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to file a JIRA after testing out against the master branch if the issue is persistent?

@MaxGekk MaxGekk deleted the csv-line-sep branch August 17, 2019 13:33
@don4of4
Copy link

don4of4 commented Dec 5, 2019

Is this feature in version 3.0? If not, when can we expect it?

@MaxGekk
Copy link
Member Author

MaxGekk commented Dec 5, 2019

Is this feature in version 3.0?

@don4of4 It should be in 3.0.

@lamduynguyen
Copy link

@HyukjinKwon is there any plan to support longer line separator?

@HyukjinKwon
Copy link
Member

That's blocked by Univocity library's limitation. You should ask it there first.

@@ -922,6 +925,8 @@ def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=No
the default UTF-8 charset will be used.
:param emptyValue: sets the string representation of an empty value. If None is set, it uses
the default value, ``""``.
:param lineSep: defines the line separator that should be used for writing. If None is
set, it uses the default value, ``\\n``. Maximum length is 1 character.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I'm missing something, but has this removed the ability use \r\n?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark never supported \r\n in writing path.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revisiting this since I'd like to get rid of a local patch.

Why do you say it doesn't support this?

Reverting to the 2 character restriction works in my testing, on both the read and write paths and using arbitrary two character delimiters.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the extra comments: hadn't read deeply enough.

So the problem is Univocity's normalizedNewLine stuff? It fails in multiline cases? That's what I'm seeing in the tests and would explain why I don't see it in my use cases.

If that's the case, wondering if it's okay to allow two characters for the non-multiline cases?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file a JIRA and go ahead if you can.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants