File tree Expand file tree Collapse file tree 3 files changed +17
-0
lines changed
catalyst/src/test/scala/org/apache/spark/sql/connector
main/scala/org/apache/spark/sql/execution/datasources/v2
test/scala/org/apache/spark/sql/connector Expand file tree Collapse file tree 3 files changed +17
-0
lines changed Original file line number Diff line number Diff line change @@ -34,6 +34,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
3434 protected val tables : util.Map [Identifier , InMemoryTable ] =
3535 new ConcurrentHashMap [Identifier , InMemoryTable ]()
3636
37+ private val invalidatedTables : util.Set [Identifier ] = ConcurrentHashMap .newKeySet()
38+
3739 private var _name : Option [String ] = None
3840
3941 override def initialize (name : String , options : CaseInsensitiveStringMap ): Unit = {
@@ -55,6 +57,10 @@ class BasicInMemoryTableCatalog extends TableCatalog {
5557 }
5658 }
5759
60+ override def invalidateTable (ident : Identifier ): Unit = {
61+ invalidatedTables.add(ident)
62+ }
63+
5864 override def createTable (
5965 ident : Identifier ,
6066 schema : StructType ,
@@ -104,6 +110,10 @@ class BasicInMemoryTableCatalog extends TableCatalog {
104110 }
105111 }
106112
113+ def isTableInvalidated (ident : Identifier ): Boolean = {
114+ invalidatedTables.contains(ident)
115+ }
116+
107117 def clearTables (): Unit = {
108118 tables.clear()
109119 }
Original file line number Diff line number Diff line change @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
2020import org .apache .spark .sql .catalyst .InternalRow
2121import org .apache .spark .sql .catalyst .expressions .Attribute
2222import org .apache .spark .sql .connector .catalog .{Identifier , TableCatalog }
23+
2324case class RefreshTableExec (
2425 catalog : TableCatalog ,
2526 ident : Identifier ) extends V2CommandExec {
Original file line number Diff line number Diff line change @@ -1090,7 +1090,13 @@ class DataSourceV2SQLSuite
10901090 val t = " testcat.ns1.ns2.tbl"
10911091 withTable(t) {
10921092 sql(s " CREATE TABLE $t (id bigint, data string) USING foo " )
1093+
1094+ val testCatalog = catalog(" testcat" ).asTableCatalog.asInstanceOf [InMemoryTableCatalog ]
1095+ val identifier = Identifier .of(Array (" ns1" , " ns2" ), " tbl" )
1096+
1097+ assert(! testCatalog.isTableInvalidated(identifier))
10931098 sql(s " REFRESH TABLE $t" )
1099+ assert(testCatalog.isTableInvalidated(identifier))
10941100 }
10951101 }
10961102
You can’t perform that action at this time.
0 commit comments