-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[Spark-21996][SQL] read files with space in name for streaming #19247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
To handle file names with special characters, we should use java doc here While this change fix the specific issue raised, I did a search and find multiple places in spark code where |
@Joseph-Torres @brkyvz @lw-lin can you please take a look? (sorry for uninvited mentions but i just took the latest commits on |
@@ -233,7 +233,7 @@ class FileStreamSource( | |||
} | |||
|
|||
val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status => | |||
(status.getPath.toUri.toString, status.getModificationTime) | |||
(status.getPath.toUri.getPath, status.getModificationTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not status.getPath.toString
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @xysun ^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The correct fix is fixing this line: https://github.com/xysun/spark/blob/4f5979a72ce9cb36a3327e79b8592b9e42bdf5af/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L168
It should be files.map(new Path(new URI(_.path)).toString)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, getPath
will drop scheme and credentials.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait .. @zsxwing do you mean getPath
(org.apache.hadoop.fs.Path
) from org.apache.hadoop.fs.FileStatus
drops the scheme and credentials?
Here seems Seq[org.apache.hadoop.fs.FileStatus]
and the code you pointed out looks Array[org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry]
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohaa, getPath
from java.net.URI
in the proposed fix drops them. Sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I'll update the commit.
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
hi @HyukjinKwon @zsxwing @mgaido91 i have updated code according to the comments, also merged with latest master. Please review. Thanks. |
ok to test |
Test build #86130 has finished for PR 19247 at commit
|
retest this please |
@@ -408,6 +420,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest { | |||
} | |||
} | |||
|
|||
test("SPARK-21996 read from text files -- file name has space") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we run the same test also for the other input format, ie. parquet, orc, ... ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test should be enough. The issue is in file stream source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for this PR it is, but it would be great if we can ensure that all the data sources have the same behavior... Maybe we can do this is another PR if you think it is better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not. I meant it's not an issue of file formats. There are not some special codes in file stream source. If there should be any tests for such issue, they should be inside file format tests.
Test build #86132 has finished for PR 19247 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your PR. Could you also add a test to read files generated by file sink, such as
test("SPARK-21996 read from text files generated by file sink -- file name has space") {
val testTableName = "FileStreamSourceTest"
withTable(testTableName) {
withTempDirs { case (src, checkpoint) =>
val output = new File(src, "text text")
val inputData = MemoryStream[String]
val ds = inputData.toDS()
val query = ds.writeStream
.option("checkpointLocation", checkpoint.getCanonicalPath)
.format("text")
.start(output.getCanonicalPath)
try {
inputData.addData("foo")
failAfter(streamingTimeout) {
query.processAllAvailable()
}
} finally {
query.stop()
}
val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
val query2 = df2.writeStream.format("memory").queryName(testTableName).start()
try {
query2.processAllAvailable()
checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
} finally {
query2.stop()
}
}
}
}
@@ -86,6 +86,18 @@ abstract class FileStreamSourceTest | |||
} | |||
} | |||
|
|||
case class AddTextFileDataWithSpaceInFileName(content: String, src: File, tmp: File) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest that adding a new parameter to AddTextFileData rather than introducing a new class, such as
case class AddTextFileData(content: String, src: File, tmp: File, tempFilePrefix: String = "text")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure will update and add the test for file sink. Thanks for the review.
Hi @zsxwing I have pushed latest changes (for file sink I'll be honest I simply copied your code =p) |
Test build #86237 has finished for PR 19247 at commit
|
Test build #86241 has finished for PR 19247 at commit
|
Test build #86243 has finished for PR 19247 at commit
|
retest this please ok so SparkQA does not listen to me :/ |
retest this please |
Test build #86288 has finished for PR 19247 at commit
|
Thanks! Merging to master and 2.3. |
## What changes were proposed in this pull request? Structured streaming is now able to read files with space in file name (previously it would skip the file and output a warning) ## How was this patch tested? Added new unit test. Author: Xiayun Sun <xiayunsun@gmail.com> Closes #19247 from xysun/SPARK-21996. (cherry picked from commit 0219470) Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
What changes were proposed in this pull request?
Structured streaming is now able to read files with space in file name (previously it would skip the file and output a warning)
How was this patch tested?
Added new unit test.