-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-25393][SQL] Adding new function from_csv() #22379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
344c2ab
5905019
c5ac432
bd2124c
b9bb081
14ae619
cfb2ac3
d2bfd94
b3a8666
d19242d
147c978
42b8227
2a0b65b
1ccca30
e7175ec
7063af0
aee472c
f13f64a
5e3787b
81ae688
4bba75e
65197f1
f5465ca
27e162b
783559a
4507351
211a1aa
5945322
5590855
4c2fcea
9102135
b36d96a
e75aefc
2169b86
cb77e6e
826be4e
2b7c268
a5b6f69
1d31954
0ee4ec1
e52b71f
3af7bfa
24481c2
70149d8
d700896
643aaea
3f76ffb
7d33783
5091625
b318239
a95e266
3d8a893
9908e2e
8d297b2
a6c60e9
c3a31d4
c479973
88e3b10
2ffed5f
a32bbcb
b26e49e
93d094f
cb23bd7
205e4a4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,6 +103,12 @@ | |
<groupId>commons-codec</groupId> | ||
<artifactId>commons-codec</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.univocity</groupId> | ||
<artifactId>univocity-parsers</artifactId> | ||
<version>2.7.3</version> | ||
<type>jar</type> | ||
</dependency> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @MaxGekk , @gatorsmile , and @cloud-fan . I know this is the same approach with
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the dependency only because I have to move There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we should just make CSV one as a separate external module and this should be the right way given the discussion. The current change wouldn't necessarily is blocked but I can see the point of moving the dependency makes further refactoring potentially harder as pointed out. Looks many people agreed upon separating them. The concern here is, it sounds we are stepping back from the ideal approach. |
||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -520,7 +520,10 @@ object FunctionRegistry { | |
castAlias("date", DateType), | ||
castAlias("timestamp", TimestampType), | ||
castAlias("binary", BinaryType), | ||
castAlias("string", StringType) | ||
castAlias("string", StringType), | ||
|
||
// csv | ||
expression[CsvToStructs]("from_csv") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the only reason to move CSV functionality to catalyst is to register it as built-in function? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, it looks so. That's actually the concern in #22379 (comment) and here's my opinion on that #22379 (comment) and #22379 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sounds a reasonable change. cc @rxin |
||
) | ||
|
||
val builtin: SimpleFunctionRegistry = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.csv | ||
|
||
object CSVExprUtils { | ||
/** | ||
* Filter ignorable rows for CSV iterator (lines empty and starting with `comment`). | ||
* This is currently being used in CSV reading path and CSV schema inference. | ||
*/ | ||
def filterCommentAndEmpty(iter: Iterator[String], options: CSVOptions): Iterator[String] = { | ||
iter.filter { line => | ||
line.trim.nonEmpty && !line.startsWith(options.comment.toString) | ||
} | ||
} | ||
|
||
def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = { | ||
if (options.isCommentSet) { | ||
val commentPrefix = options.comment.toString | ||
iter.dropWhile { line => | ||
line.trim.isEmpty || line.trim.startsWith(commentPrefix) | ||
} | ||
} else { | ||
iter.dropWhile(_.trim.isEmpty) | ||
} | ||
} | ||
|
||
/** | ||
* Extracts header and moves iterator forward so that only data remains in it | ||
*/ | ||
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = { | ||
val nonEmptyLines = skipComments(iter, options) | ||
if (nonEmptyLines.hasNext) { | ||
Some(nonEmptyLines.next()) | ||
} else { | ||
None | ||
} | ||
} | ||
|
||
/** | ||
* Helper method that converts string representation of a character to actual character. | ||
* It handles some Java escaped strings and throws exception if given string is longer than one | ||
* character. | ||
*/ | ||
@throws[IllegalArgumentException] | ||
def toChar(str: String): Char = { | ||
(str: Seq[Char]) match { | ||
case Seq() => throw new IllegalArgumentException("Delimiter cannot be empty string") | ||
case Seq('\\') => throw new IllegalArgumentException("Single backslash is prohibited." + | ||
" It has special meaning as beginning of an escape sequence." + | ||
" To get the backslash character, pass a string with two backslashes as the delimiter.") | ||
case Seq(c) => c | ||
case Seq('\\', 't') => '\t' | ||
case Seq('\\', 'r') => '\r' | ||
case Seq('\\', 'b') => '\b' | ||
case Seq('\\', 'f') => '\f' | ||
// In case user changes quote char and uses \" as delimiter in options | ||
case Seq('\\', '\"') => '\"' | ||
case Seq('\\', '\'') => '\'' | ||
case Seq('\\', '\\') => '\\' | ||
case _ if str == """\u0000""" => '\u0000' | ||
case Seq('\\', _) => | ||
throw new IllegalArgumentException(s"Unsupported special character for delimiter: $str") | ||
case _ => | ||
throw new IllegalArgumentException(s"Delimiter cannot be more than one character: $str") | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import org.apache.spark.sql.AnalysisException | ||
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData | ||
import org.apache.spark.sql.types.{MapType, StringType, StructType} | ||
|
||
object ExprUtils { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same comment in this object |
||
|
||
def evalSchemaExpr(exp: Expression): StructType = exp match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The difference between the two functions appeared when I modified |
||
case Literal(s, StringType) => StructType.fromDDL(s.toString) | ||
case e => throw new AnalysisException( | ||
s"Schema should be specified in DDL format as a string literal instead of ${e.sql}") | ||
} | ||
|
||
def convertToMapData(exp: Expression): Map[String, String] = exp match { | ||
case m: CreateMap | ||
if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => | ||
val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] | ||
ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => | ||
key.toString -> value.toString | ||
} | ||
case m: CreateMap => | ||
throw new AnalysisException( | ||
s"A type of keys and values in map() must be string, but got ${m.dataType.catalogString}") | ||
case _ => | ||
throw new AnalysisException("Must use a map() function for options") | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.