Skip to content

Commit 4d6856e

Browse files
ibuderMaxGekk
authored andcommitted
[SPARK-41311][SQL][TESTS] Rewrite test RENAME_SRC_PATH_NOT_FOUND to trigger the error from user space
### What changes were proposed in this pull request? This updates the test for error class RENAME_SRC_PATH_NOT_FOUND in QueryExecutionErrorsSuite to trigger the error with user-facing APIs. ### Why are the changes needed? To test the code more directly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The only change is the modification of the RENAME_SRC_PATH_NOT_FOUND test. ### License This contribution is my original work. I license the work to the project under the project’s open source license. Closes #39348 from ibuder/SPARK-41311. Authored-by: Immanuel Buder <immanuel_buder@intuit.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
1 parent b7a0fc4 commit 4d6856e

File tree

1 file changed

+31
-23
lines changed

1 file changed

+31
-23
lines changed

sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.{URI, URL}
2222
import java.sql.{Connection, Driver, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}
2323
import java.util.{Locale, Properties, ServiceConfigurationError}
2424

25-
import org.apache.hadoop.conf.Configuration
2625
import org.apache.hadoop.fs.{LocalFileSystem, Path}
2726
import org.apache.hadoop.fs.permission.FsPermission
2827
import org.mockito.Mockito.{mock, spy, when}
@@ -36,7 +35,6 @@ import org.apache.spark.sql.execution.datasources.orc.OrcTest
3635
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
3736
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
3837
import org.apache.spark.sql.execution.streaming.FileSystemBasedCheckpointFileManager
39-
import org.apache.spark.sql.execution.streaming.state.RenameReturnsFalseFileSystem
4038
import org.apache.spark.sql.functions.{lit, lower, struct, sum, udf}
4139
import org.apache.spark.sql.internal.SQLConf
4240
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.EXCEPTION
@@ -670,28 +668,34 @@ class QueryExecutionErrorsSuite
670668
}
671669

672670
test("RENAME_SRC_PATH_NOT_FOUND: rename the file which source path does not exist") {
673-
var srcPath: Path = null
674-
val e = intercept[SparkFileNotFoundException](
675-
withTempPath { p =>
676-
val conf = new Configuration()
677-
conf.set("fs.test.impl", classOf[RenameReturnsFalseFileSystem].getName)
678-
conf.set("fs.defaultFS", "test:///")
679-
val basePath = new Path(p.getAbsolutePath)
680-
val fm = new FileSystemBasedCheckpointFileManager(basePath, conf)
681-
srcPath = new Path(s"$basePath/file")
682-
assert(!fm.exists(srcPath))
683-
fm.createAtomic(srcPath, overwriteIfPossible = true).cancel()
684-
assert(!fm.exists(srcPath))
685-
val dstPath = new Path(s"$basePath/new_file")
686-
fm.renameTempFile(srcPath, dstPath, true)
671+
withTempPath { p =>
672+
withSQLConf(
673+
"spark.sql.streaming.checkpointFileManagerClass" ->
674+
classOf[FileSystemBasedCheckpointFileManager].getName,
675+
"fs.file.impl" -> classOf[FakeFileSystemNeverExists].getName,
676+
// FileSystem caching could cause a different implementation of fs.file to be used
677+
"fs.file.impl.disable.cache" -> "true"
678+
) {
679+
val checkpointLocation = p.getAbsolutePath
680+
681+
val ds = spark.readStream.format("rate").load()
682+
val e = intercept[SparkFileNotFoundException] {
683+
ds.writeStream
684+
.option("checkpointLocation", checkpointLocation)
685+
.queryName("_")
686+
.format("memory")
687+
.start()
688+
}
689+
690+
val expectedPath = p.toURI
691+
checkError(
692+
exception = e,
693+
errorClass = "RENAME_SRC_PATH_NOT_FOUND",
694+
matchPVals = true,
695+
parameters = Map("sourcePath" -> s"$expectedPath.+")
696+
)
687697
}
688-
)
689-
checkError(
690-
exception = e,
691-
errorClass = "RENAME_SRC_PATH_NOT_FOUND",
692-
parameters = Map(
693-
"sourcePath" -> s"$srcPath"
694-
))
698+
}
695699
}
696700

697701
test("UNSUPPORTED_FEATURE.JDBC_TRANSACTION: the target JDBC server does not support " +
@@ -781,3 +785,7 @@ class FakeFileSystemSetPermission extends LocalFileSystem {
781785
class FakeFileSystemAlwaysExists extends DebugFilesystem {
782786
override def exists(f: Path): Boolean = true
783787
}
788+
789+
class FakeFileSystemNeverExists extends DebugFilesystem {
790+
override def exists(f: Path): Boolean = false
791+
}

0 commit comments

Comments
 (0)