Skip to content

Commit 9cfc628

Browse files
LuciferYangdongjoon-hyun
authored andcommitted
[SPARK-50855][CONNECT][TESTS][FOLLOWUP] Refactor TransformWithStateConnectSuite to run DROP TABLE IF EXISTS my_sink in beforeAll/afterEach
### What changes were proposed in this pull request? This PR refactors the `TransformWithStateConnectSuite`: 1. Overrides the `beforeAll` method to execute `spark.sql("DROP TABLE IF EXISTS my_sink")`, ensuring that no table named `my_sink` exists before the test cases in `TransformWithStateConnectSuite` are executed. 2. Overrides the `afterEach` method to execute `spark.sql("DROP TABLE IF EXISTS my_sink")`, ensuring that any potential `my_sink` table is cleaned up after each test case in `TransformWithStateConnectSuite` is executed. 3. Removes the calls to `spark.sql("DROP TABLE IF EXISTS my_sink")` within the test cases. ### Why are the changes needed? The PR at apache#49488 introduced the `TransformWithStateConnectSuite`, in which the test case `transformWithState - batch query` did not delete the table `my_sink` after execution. Additionally, due to inheriting from `RemoteSparkSession`, the test cases shared a connect server, which caused the test case `Table APIs` in the `CatalogSuite` to fail during the Maven daily test: - https://github.com/apache/spark/actions/runs/13654375212/job/38169921062 ![image](https://github.com/user-attachments/assets/bcefe19e-7668-4092-a0e8-955d61bc28e2) Therefore, this PR refactors the `TransformWithStateConnectSuite` to ensure that the table named `my_sink` does not exist before or after the execution of the tests in `TransformWithStateConnectSuite`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual check: ``` build/mvn -DskipTests -Pyarn -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pjvm-profiler -Pspark-ganglia-lgpl -Pkinesis-asl clean install build/mvn test -pl sql/connect/client/jvm -fae ``` Before ``` CatalogSuite: - Database APIs - CatalogMetadata APIs - Table APIs *** FAILED *** Array(Table[name='my_sink', catalog='spark_catalog', database='default', tableType='MANAGED', isTemporary='false']) was not empty (CatalogSuite.scala:91) Run completed in 3 minutes, 20 seconds. Total number of tests run: 1474 Suites: completed 36, aborted 0 Tests: succeeded 1473, failed 1, canceled 0, ignored 6, pending 0 *** 1 TEST FAILED *** ``` After ``` Run completed in 3 minutes, 22 seconds. Total number of tests run: 1474 Suites: completed 36, aborted 0 Tests: succeeded 1474, failed 0, canceled 0, ignored 6, pending 0 All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50155 from LuciferYang/SPARK-50855-FOLLOWUP. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent eb71443 commit 9cfc628

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/TransformWithStateConnectSuite.scala

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.{BufferedWriter, File, FileWriter}
2121
import java.nio.file.Paths
2222
import java.sql.Timestamp
2323

24+
import org.scalatest.BeforeAndAfterEach
2425
import org.scalatest.concurrent.Eventually.eventually
2526
import org.scalatest.concurrent.Futures.timeout
2627
import org.scalatest.time.SpanSugar._
@@ -188,7 +189,11 @@ class TTLTestStatefulProcessor
188189
}
189190
}
190191

191-
class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession with Logging {
192+
class TransformWithStateConnectSuite
193+
extends QueryTest
194+
with RemoteSparkSession
195+
with Logging
196+
with BeforeAndAfterEach {
192197
val testData: Seq[(String, String)] = Seq(("a", "1"), ("b", "1"), ("a", "2"))
193198
val twsAdditionalSQLConf = Seq(
194199
"spark.sql.streaming.stateStore.providerClass" ->
@@ -197,13 +202,24 @@ class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession w
197202
"spark.sql.session.timeZone" -> "UTC",
198203
"spark.sql.streaming.noDataMicroBatches.enabled" -> "false")
199204

205+
override def beforeAll(): Unit = {
206+
super.beforeAll()
207+
spark.sql("DROP TABLE IF EXISTS my_sink")
208+
}
209+
210+
override protected def afterEach(): Unit = {
211+
try {
212+
spark.sql("DROP TABLE IF EXISTS my_sink")
213+
} finally {
214+
super.afterEach()
215+
}
216+
}
217+
200218
test("transformWithState - streaming with state variable, case class type") {
201219
withSQLConf(twsAdditionalSQLConf: _*) {
202220
val session: SparkSession = spark
203221
import session.implicits._
204222

205-
spark.sql("DROP TABLE IF EXISTS my_sink")
206-
207223
withTempPath { dir =>
208224
val path = dir.getCanonicalPath
209225
testData
@@ -242,7 +258,6 @@ class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession w
242258
}
243259
} finally {
244260
q.stop()
245-
spark.sql("DROP TABLE IF EXISTS my_sink")
246261
}
247262
}
248263
}
@@ -253,8 +268,6 @@ class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession w
253268
val session: SparkSession = spark
254269
import session.implicits._
255270

256-
spark.sql("DROP TABLE IF EXISTS my_sink")
257-
258271
withTempPath { dir =>
259272
val path = dir.getCanonicalPath
260273
testData
@@ -299,7 +312,6 @@ class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession w
299312
}
300313
} finally {
301314
q.stop()
302-
spark.sql("DROP TABLE IF EXISTS my_sink")
303315
}
304316
}
305317
}
@@ -444,8 +456,6 @@ class TransformWithStateConnectSuite extends QueryTest with RemoteSparkSession w
444456
val session: SparkSession = spark
445457
import session.implicits._
446458

447-
spark.sql("DROP TABLE IF EXISTS my_sink")
448-
449459
withTempPath { dir =>
450460
val path = dir.getCanonicalPath
451461
testData

0 commit comments

Comments
 (0)