Skip to content

Commit de36f65

Browse files
Henry RobinsonHyukjinKwon
Henry Robinson
authored andcommitted
[SPARK-23148][SQL] Allow pathnames with special characters for CSV / JSON / text
…JSON / text ## What changes were proposed in this pull request? Fix for JSON and CSV data sources when file names include characters that would be changed by URL encoding. ## How was this patch tested? New unit tests for JSON, CSV and text suites Author: Henry Robinson <henry@cloudera.com> Closes #20355 from henryr/spark-23148.
1 parent 7af1a32 commit de36f65

File tree

4 files changed

+31
-14
lines changed

4 files changed

+31
-14
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CodecStreams.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ object CodecStreams {
4545
}
4646

4747
/**
48-
* Creates an input stream from the string path and add a closure for the input stream to be
48+
* Creates an input stream from the given path and add a closure for the input stream to be
4949
* closed on task completion.
5050
*/
51-
def createInputStreamWithCloseResource(config: Configuration, path: String): InputStream = {
52-
val inputStream = createInputStream(config, new Path(path))
51+
def createInputStreamWithCloseResource(config: Configuration, path: Path): InputStream = {
52+
val inputStream = createInputStream(config, path)
5353
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => inputStream.close()))
5454
inputStream
5555
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,12 @@
1717

1818
package org.apache.spark.sql.execution.datasources.csv
1919

20+
import java.net.URI
2021
import java.nio.charset.{Charset, StandardCharsets}
2122

2223
import com.univocity.parsers.csv.CsvParser
2324
import org.apache.hadoop.conf.Configuration
24-
import org.apache.hadoop.fs.FileStatus
25+
import org.apache.hadoop.fs.{FileStatus, Path}
2526
import org.apache.hadoop.io.{LongWritable, Text}
2627
import org.apache.hadoop.mapred.TextInputFormat
2728
import org.apache.hadoop.mapreduce.Job
@@ -32,7 +33,6 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
3233
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
3334
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
3435
import org.apache.spark.sql.catalyst.InternalRow
35-
import org.apache.spark.sql.execution.SQLExecution
3636
import org.apache.spark.sql.execution.datasources._
3737
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
3838
import org.apache.spark.sql.types.StructType
@@ -206,7 +206,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
206206
parser: UnivocityParser,
207207
schema: StructType): Iterator[InternalRow] = {
208208
UnivocityParser.parseStream(
209-
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath),
209+
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))),
210210
parser.options.headerFlag,
211211
parser,
212212
schema)
@@ -218,8 +218,9 @@ object MultiLineCSVDataSource extends CSVDataSource {
218218
parsedOptions: CSVOptions): StructType = {
219219
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
220220
csv.flatMap { lines =>
221+
val path = new Path(lines.getPath())
221222
UnivocityParser.tokenizeStream(
222-
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, lines.getPath()),
223+
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
223224
shouldDropHeader = false,
224225
new CsvParser(parsedOptions.asParserSettings))
225226
}.take(1).headOption match {
@@ -230,7 +231,7 @@ object MultiLineCSVDataSource extends CSVDataSource {
230231
UnivocityParser.tokenizeStream(
231232
CodecStreams.createInputStreamWithCloseResource(
232233
lines.getConfiguration,
233-
lines.getPath()),
234+
new Path(lines.getPath())),
234235
parsedOptions.headerFlag,
235236
new CsvParser(parsedOptions.asParserSettings))
236237
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
package org.apache.spark.sql.execution.datasources.json
1919

2020
import java.io.InputStream
21+
import java.net.URI
2122

2223
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
2324
import com.google.common.io.ByteStreams
2425
import org.apache.hadoop.conf.Configuration
25-
import org.apache.hadoop.fs.FileStatus
26+
import org.apache.hadoop.fs.{FileStatus, Path}
2627
import org.apache.hadoop.io.Text
2728
import org.apache.hadoop.mapreduce.Job
2829
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
@@ -168,9 +169,10 @@ object MultiLineJsonDataSource extends JsonDataSource {
168169
}
169170

170171
private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
172+
val path = new Path(record.getPath())
171173
CreateJacksonParser.inputStream(
172174
jsonFactory,
173-
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, record.getPath()))
175+
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
174176
}
175177

176178
override def readFile(
@@ -180,7 +182,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
180182
schema: StructType): Iterator[InternalRow] = {
181183
def partitionedFileString(ignored: Any): UTF8String = {
182184
Utils.tryWithResource {
183-
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath)
185+
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
184186
} { inputStream =>
185187
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
186188
}
@@ -193,6 +195,6 @@ object MultiLineJsonDataSource extends JsonDataSource {
193195
parser.options.columnNameOfCorruptRecord)
194196

195197
safeParser.parse(
196-
CodecStreams.createInputStreamWithCloseResource(conf, file.filePath))
198+
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
197199
}
198200
}

sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
2323
import testImplicits._
2424

2525
private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")
26+
private val nameWithSpecialChars = "sp&cial%c hars"
2627

2728
allFileBasedDataSources.foreach { format =>
2829
test(s"Writing empty datasets should not fail - $format") {
@@ -54,7 +55,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
5455
// Only ORC/Parquet support this. `CSV` and `JSON` returns an empty schema.
5556
// `TEXT` data source always has a single column whose name is `value`.
5657
Seq("orc", "parquet").foreach { format =>
57-
test(s"SPARK-15474 Write and read back non-emtpy schema with empty dataframe - $format") {
58+
test(s"SPARK-15474 Write and read back non-empty schema with empty dataframe - $format") {
5859
withTempPath { file =>
5960
val path = file.getCanonicalPath
6061
val emptyDf = Seq((true, 1, "str")).toDF().limit(0)
@@ -69,7 +70,6 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
6970

7071
allFileBasedDataSources.foreach { format =>
7172
test(s"SPARK-22146 read files containing special characters using $format") {
72-
val nameWithSpecialChars = s"sp&cial%chars"
7373
withTempDir { dir =>
7474
val tmpFile = s"$dir/$nameWithSpecialChars"
7575
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
@@ -78,4 +78,18 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext {
7878
}
7979
}
8080
}
81+
82+
// Separate test case for formats that support multiLine as an option.
83+
Seq("json", "csv").foreach { format =>
84+
test("SPARK-23148 read files containing special characters " +
85+
s"using $format with multiline enabled") {
86+
withTempDir { dir =>
87+
val tmpFile = s"$dir/$nameWithSpecialChars"
88+
spark.createDataset(Seq("a", "b")).write.format(format).save(tmpFile)
89+
val reader = spark.read.format(format).option("multiLine", true)
90+
val fileContent = reader.load(tmpFile)
91+
checkAnswer(fileContent, Seq(Row("a"), Row("b")))
92+
}
93+
}
94+
}
8195
}

0 commit comments

Comments
 (0)