|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.execution.streaming
|
19 | 19 |
|
| 20 | +import java.io.File |
| 21 | +import java.net.URI |
| 22 | + |
| 23 | +import scala.util.Random |
| 24 | + |
| 25 | +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} |
| 26 | +import org.scalatest.mock.MockitoSugar |
| 27 | + |
20 | 28 | import org.apache.spark.SparkFunSuite
|
| 29 | +import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._ |
| 30 | +import org.apache.spark.sql.test.SharedSQLContext |
| 31 | +import org.apache.spark.sql.types.StructType |
21 | 32 |
|
22 |
| -class FileStreamSourceSuite extends SparkFunSuite { |
| 33 | +class FileStreamSourceSuite extends SparkFunSuite |
| 34 | + with SharedSQLContext |
| 35 | + with MockitoSugar { |
23 | 36 |
|
24 | 37 | import FileStreamSource._
|
25 | 38 |
|
@@ -73,4 +86,41 @@ class FileStreamSourceSuite extends SparkFunSuite {
|
73 | 86 | assert(map.isNewFile(FileEntry("b", 10)))
|
74 | 87 | }
|
75 | 88 |
|
| 89 | + testWithUninterruptibleThread("do not recheck that files exist during getBatch") { |
| 90 | + withTempDir { temp => |
| 91 | + spark.conf.set( |
| 92 | + s"fs.$scheme.impl", |
| 93 | + classOf[ExistsThrowsExceptionFileSystem].getName) |
| 94 | + // add the metadata entries as a pre-req |
| 95 | + val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir |
| 96 | + val metadataLog = new HDFSMetadataLog[Array[FileEntry]](spark, dir.getAbsolutePath) |
| 97 | + assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L)))) |
| 98 | + |
| 99 | + val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil), |
| 100 | + dir.getAbsolutePath, Map.empty) |
| 101 | + // this method should throw an exception if `fs.exists` is called during resolveRelation |
| 102 | + newSource.getBatch(None, LongOffset(1)) |
| 103 | + } |
| 104 | + } |
| 105 | +} |
| 106 | + |
| 107 | +/** FakeFileSystem to test fallback of the HDFSMetadataLog from FileContext to FileSystem API */ |
| 108 | +class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem { |
| 109 | + override def getUri: URI = { |
| 110 | + URI.create(s"$scheme:///") |
| 111 | + } |
| 112 | + |
| 113 | + override def exists(f: Path): Boolean = { |
| 114 | + throw new IllegalArgumentException("Exists shouldn't have been called!") |
| 115 | + } |
| 116 | + |
| 117 | + override def listStatus(file: Path): Array[FileStatus] = { |
| 118 | + val emptyFile = new FileStatus() |
| 119 | + emptyFile.setPath(file) |
| 120 | + Array(emptyFile) |
| 121 | + } |
| 122 | +} |
| 123 | + |
| 124 | +object ExistsThrowsExceptionFileSystem { |
| 125 | + val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs" |
76 | 126 | }
|
0 commit comments