Skip to content

Commit 147a98b

Browse files
committed
[SPARK-48991][SQL] Move path initialization into try-catch block in FileStreamSink.hasMetadata
### What changes were proposed in this pull request? This pull request proposed to move path initialization into try-catch block in FileStreamSink.hasMetadata. Then, exceptions from invalid paths can be handled consistently like other path-related exceptions in the current try-catch block. At last, we can make the errors fall into the correct code branches to be handled ### Why are the changes needed? bugfix for improperly handled exceptions in FileStreamSink.hasMetadata ### Does this PR introduce _any_ user-facing change? no, an invalid path is still invalid, but fails in the correct places ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47471 from yaooqinn/SPARK-48991. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit d68cde8) Signed-off-by: Kent Yao <yao@apache.org>
1 parent 405478b commit 147a98b

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ object FileStreamSink extends Logging {
4848

4949
path match {
5050
case Seq(singlePath) =>
51-
val hdfsPath = new Path(singlePath)
5251
try {
52+
val hdfsPath = new Path(singlePath)
5353
val fs = hdfsPath.getFileSystem(hadoopConf)
5454
if (fs.isDirectory(hdfsPath)) {
5555
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,19 @@ abstract class FileStreamSinkSuite extends StreamTest {
650650
}
651651
}
652652
}
653+
654+
test("SPARK-48991: Move path initialization into try-catch block") {
655+
val logAppender = new LogAppender("Assume no metadata directory.")
656+
Seq(null, "", "file:tmp").foreach { path =>
657+
withLogAppender(logAppender) {
658+
assert(!FileStreamSink.hasMetadata(Seq(path), spark.sessionState.newHadoopConf(), conf))
659+
}
660+
661+
assert(logAppender.loggingEvents.map(_.getMessage.getFormattedMessage).contains(
662+
"Assume no metadata directory. Error while looking for metadata directory in the path:" +
663+
s" $path."))
664+
}
665+
}
653666
}
654667

655668
object PendingCommitFilesTrackingManifestFileCommitProtocol {

0 commit comments

Comments
 (0)