Skip to content

Commit c038aaa

Browse files
committed
Merging the master branch
1 parent 38edcfb commit c038aaa

File tree

7 files changed

+34
-34
lines changed

7 files changed

+34
-34
lines changed
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql.execution.datasources.csv
18+
package org.apache.spark.sql.catalyst.csv
1919

2020
import com.univocity.parsers.csv.CsvParser
2121

@@ -107,7 +107,7 @@ class CSVHeaderChecker(
107107
}
108108

109109
// This is currently only used to parse CSV with multiLine mode.
110-
private[csv] def checkHeaderColumnNames(tokenizer: CsvParser): Unit = {
110+
private[sql] def checkHeaderColumnNames(tokenizer: CsvParser): Unit = {
111111
assert(options.multiLine, "This method should be executed with multiLine.")
112112
if (options.headerFlag) {
113113
val firstRecord = tokenizer.parseNext()
@@ -116,7 +116,7 @@ class CSVHeaderChecker(
116116
}
117117

118118
// This is currently only used to parse CSV with non-multiLine mode.
119-
private[csv] def checkHeaderColumnNames(lines: Iterator[String], tokenizer: CsvParser): Unit = {
119+
private[sql] def checkHeaderColumnNames(lines: Iterator[String], tokenizer: CsvParser): Unit = {
120120
assert(!options.multiLine, "This method should not be executed with multiline.")
121121
// Checking that column names in the header are matched to field names of the schema.
122122
// The header will be removed from lines.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVUtils.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,30 @@
1818
package org.apache.spark.sql.catalyst.csv
1919

2020
object CSVUtils {
21+
22+
def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
23+
if (options.isCommentSet) {
24+
val commentPrefix = options.comment.toString
25+
iter.dropWhile { line =>
26+
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
27+
}
28+
} else {
29+
iter.dropWhile(_.trim.isEmpty)
30+
}
31+
}
32+
33+
/**
34+
* Extracts header and moves iterator forward so that only data remains in it
35+
*/
36+
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = {
37+
val nonEmptyLines = skipComments(iter, options)
38+
if (nonEmptyLines.hasNext) {
39+
Some(nonEmptyLines.next())
40+
} else {
41+
None
42+
}
43+
}
44+
2145
/**
2246
* Helper method that converts string representation of a character to actual character.
2347
* It handles some Java escaped strings and throws exception if given string is longer than one

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@ import java.util.{Locale, Properties}
2222
import scala.collection.JavaConverters._
2323

2424
import com.fasterxml.jackson.databind.ObjectMapper
25-
import com.univocity.parsers.csv.CsvParser
2625

2726
import org.apache.spark.Partition
2827
import org.apache.spark.annotation.InterfaceStability
2928
import org.apache.spark.api.java.JavaRDD
3029
import org.apache.spark.internal.Logging
3130
import org.apache.spark.rdd.RDD
32-
import org.apache.spark.sql.catalyst.csv.CSVOptions
31+
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions}
3332
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
3433
import org.apache.spark.sql.execution.command.DDLUtils
3534
import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
3434
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
3535
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3636
import org.apache.spark.sql.catalyst.InternalRow
37-
import org.apache.spark.sql.catalyst.csv.{CSVInferSchema, CSVOptions}
37+
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVInferSchema, CSVOptions}
3838
import org.apache.spark.sql.execution.datasources._
3939
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
4040
import org.apache.spark.sql.types.StructType

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce._
2626
import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{AnalysisException, SparkSession}
2828
import org.apache.spark.sql.catalyst.InternalRow
29-
import org.apache.spark.sql.catalyst.csv.CSVOptions
29+
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions}
3030
import org.apache.spark.sql.catalyst.util.CompressionCodecs
3131
import org.apache.spark.sql.execution.datasources._
3232
import org.apache.spark.sql.sources._

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,9 @@ object CSVUtils {
6868
}
6969
}
7070

71-
def skipComments(iter: Iterator[String], options: CSVOptions): Iterator[String] = {
72-
if (options.isCommentSet) {
73-
val commentPrefix = options.comment.toString
74-
iter.dropWhile { line =>
75-
line.trim.isEmpty || line.trim.startsWith(commentPrefix)
76-
}
77-
} else {
78-
iter.dropWhile(_.trim.isEmpty)
79-
}
80-
}
81-
82-
/**
83-
* Extracts header and moves iterator forward so that only data remains in it
84-
*/
85-
def extractHeader(iter: Iterator[String], options: CSVOptions): Option[String] = {
86-
val nonEmptyLines = skipComments(iter, options)
87-
if (nonEmptyLines.hasNext) {
88-
Some(nonEmptyLines.next())
89-
} else {
90-
None
91-
}
92-
}
93-
94-
/**
95-
* Generates a header from the given row which is null-safe and duplicate-safe.
96-
*/
71+
/**
72+
* Generates a header from the given row which is null-safe and duplicate-safe.
73+
*/
9774
def makeSafeHeader(
9875
row: Array[String],
9976
caseSensitive: Boolean,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.univocity.parsers.csv.CsvParser
2727

2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.sql.catalyst.InternalRow
30-
import org.apache.spark.sql.catalyst.csv.CSVOptions
30+
import org.apache.spark.sql.catalyst.csv.{CSVHeaderChecker, CSVOptions}
3131
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
3232
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
3333
import org.apache.spark.sql.execution.datasources.FailureSafeParser

0 commit comments

Comments
 (0)