Skip to content

[Spark-21996][SQL] read files with space in name for streaming #19247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Closed

[Spark-21996][SQL] read files with space in name for streaming #19247

wants to merge 9 commits into from

Conversation

xysun
Copy link
Contributor

@xysun xysun commented Sep 15, 2017

What changes were proposed in this pull request?

Structured streaming is now able to read files with space in file name (previously it would skip the file and output a warning)

How was this patch tested?

Added new unit test.

@xysun
Copy link
Contributor Author

xysun commented Sep 15, 2017

To handle file names with special characters, we should use URI.getPath to get decoded path, instead of using toString, which may contain other characters different from original path.

java doc here

While this change fix the specific issue raised, I did a search and find multiple places in spark code where URI.toString is used. Should this be a concern?

@xysun
Copy link
Contributor Author

xysun commented Sep 19, 2017

@Joseph-Torres @brkyvz @lw-lin can you please take a look? (sorry for uninvited mentions but i just took the latest commits on FileStreamSource)

@@ -233,7 +233,7 @@ class FileStreamSource(
}

val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
(status.getPath.toUri.getPath, status.getModificationTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not status.getPath.toString?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @xysun ^

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, getPath will drop scheme and credentials.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait .. @zsxwing do you mean getPath (org.apache.hadoop.fs.Path) from org.apache.hadoop.fs.FileStatus drops the scheme and credentials?

Here seems Seq[org.apache.hadoop.fs.FileStatus] and the code you pointed out looks Array[org.apache.spark.sql.execution.streaming.FileStreamSource.FileEntry].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohaa, getPath from java.net.URI in the proposed fix drops them. Sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll update the commit.

xysun added 2 commits January 15, 2018 11:59
# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@xysun
Copy link
Contributor Author

xysun commented Jan 15, 2018

hi @HyukjinKwon @zsxwing @mgaido91 i have updated code according to the comments, also merged with latest master. Please review. Thanks.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Jan 15, 2018

Test build #86130 has finished for PR 19247 at commit 2542014.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@@ -408,6 +420,18 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-21996 read from text files -- file name has space") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we run the same test also for the other input format, ie. parquet, orc, ... ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test should be enough. The issue is in file stream source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for this PR it is, but it would be great if we can ensure that all the data sources have the same behavior... Maybe we can do this is another PR if you think it is better

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not. I meant it's not an issue of file formats. There are not some special codes in file stream source. If there should be any tests for such issue, they should be inside file format tests.

@SparkQA
Copy link

SparkQA commented Jan 15, 2018

Test build #86132 has finished for PR 19247 at commit 2542014.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your PR. Could you also add a test to read files generated by file sink, such as

  test("SPARK-21996 read from text files generated by file sink -- file name has space") {
    val testTableName = "FileStreamSourceTest"
    withTable(testTableName) {
      withTempDirs { case (src, checkpoint) =>
        val output = new File(src, "text text")
        val inputData = MemoryStream[String]
        val ds = inputData.toDS()

        val query = ds.writeStream
          .option("checkpointLocation", checkpoint.getCanonicalPath)
          .format("text")
          .start(output.getCanonicalPath)

        try {
          inputData.addData("foo")
          failAfter(streamingTimeout) {
            query.processAllAvailable()
          }
        } finally {
          query.stop()
        }

        val df2 = spark.readStream.format("text").load(output.getCanonicalPath)
        val query2 = df2.writeStream.format("memory").queryName(testTableName).start()
        try {
          query2.processAllAvailable()
          checkDatasetUnorderly(spark.table(testTableName).as[String], "foo")
        } finally {
          query2.stop()
        }
      }
    }
  }

@@ -86,6 +86,18 @@ abstract class FileStreamSourceTest
}
}

case class AddTextFileDataWithSpaceInFileName(content: String, src: File, tmp: File)
Copy link
Member

@zsxwing zsxwing Jan 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest that adding a new parameter to AddTextFileData rather than introducing a new class, such as

case class AddTextFileData(content: String, src: File, tmp: File, tempFilePrefix: String = "text")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will update and add the test for file sink. Thanks for the review.

@xysun
Copy link
Contributor Author

xysun commented Jan 17, 2018

Hi @zsxwing I have pushed latest changes (for file sink I'll be honest I simply copied your code =p)
I also verified that both tests would fail without the fix.
Please review. Thanks.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86237 has finished for PR 19247 at commit 7342d6c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86241 has finished for PR 19247 at commit 04c2b14.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AddTextFileData(content: String, src: File, tmp: File, tmpFileNamePrefix: String = \"text\")

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86243 has finished for PR 19247 at commit 10106b3.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AddTextFileData(content: String, src: File, tmp: File, tmpFilePrefix: String = \"text\")

@xysun
Copy link
Contributor Author

xysun commented Jan 17, 2018

retest this please


ok so SparkQA does not listen to me :/

@zsxwing
Copy link
Member

zsxwing commented Jan 17, 2018

retest this please

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86288 has finished for PR 19247 at commit 10106b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AddTextFileData(content: String, src: File, tmp: File, tmpFilePrefix: String = \"text\")

@zsxwing
Copy link
Member

zsxwing commented Jan 17, 2018

Thanks! Merging to master and 2.3.

asfgit pushed a commit that referenced this pull request Jan 18, 2018
## What changes were proposed in this pull request?

Structured streaming is now able to read files with space in file name (previously it would skip the file and output a warning)

## How was this patch tested?

Added new unit test.

Author: Xiayun Sun <xiayunsun@gmail.com>

Closes #19247 from xysun/SPARK-21996.

(cherry picked from commit 0219470)
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
@asfgit asfgit closed this in 0219470 Jan 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants