Skip to content

Commit e150249

Browse files
[SPARK-48843] Adding more tests that would have failed had the fix apache#47271 not been sumitted. Prevents regression. Hardens previous tests to catch infinite loop caused by head() and first() in Connect. Adds a test that would have caused infinite loop with BindParameters in Classic.
1 parent bb584de commit e150249

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

python/pyspark/sql/tests/connect/test_connect_basic.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import tempfile
2323
import io
2424
from contextlib import redirect_stdout
25+
import datetime
2526

2627
from pyspark.util import is_remote_only
2728
from pyspark.errors import PySparkTypeError, PySparkValueError
@@ -637,6 +638,16 @@ def test_namedargs_with_global_limit(self):
637638
df2 = self.spark.sql(sqlText, args={"val": 1})
638639
self.assert_eq(df.toPandas(), df2.toPandas())
639640

641+
self.assert_eq(df.first()[0], datetime.datetime(2022, 12, 25, 10, 30))
642+
self.assert_eq(df.first().date, datetime.datetime(2022, 12, 25, 10, 30))
643+
self.assert_eq(df.first()[1], 1)
644+
self.assert_eq(df.first().val, 1)
645+
646+
self.assert_eq(df.head()[0], datetime.datetime(2022, 12, 25, 10, 30))
647+
self.assert_eq(df.head().date, datetime.datetime(2022, 12, 25, 10, 30))
648+
self.assert_eq(df.head()[1], 1)
649+
self.assert_eq(df.head().val, 1)
650+
640651
def test_sql_with_pos_args(self):
641652
sqlText = "SELECT *, element_at(?, 1) FROM range(10) WHERE id > ?"
642653
df = self.connect.sql(sqlText, args=[CF.array(CF.lit(1)), 7])

sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
2222
import org.apache.spark.sql.catalyst.expressions.Literal
2323
import org.apache.spark.sql.catalyst.parser.ParseException
2424
import org.apache.spark.sql.catalyst.plans.PlanTest
25+
import org.apache.spark.sql.catalyst.plans.logical.Limit
2526
import org.apache.spark.sql.functions.{array, call_function, lit, map, map_from_arrays, map_from_entries, str_to_map, struct}
2627
import org.apache.spark.sql.internal.SQLConf
2728
import org.apache.spark.sql.test.SharedSparkSession
@@ -633,4 +634,11 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest {
633634
|""".stripMargin)
634635
checkAnswer(query, Row("ABC"))
635636
}
637+
638+
test("SPARK-48843: Prevent infinite loop with BindParameters") {
639+
val df = sql("EXECUTE IMMEDIATE 'SELECT SUM(c1) num_sum FROM VALUES (?), (?) AS t(c1) ' USING 5, 6;")
640+
val analyzedPlan = Limit(Literal.create(100), df.queryExecution.initialParsedPlan)
641+
spark.sessionState.analyzer.executeAndCheck(analyzedPlan, df.queryExecution.tracker)
642+
checkAnswer(df, Row(11))
643+
}
636644
}

0 commit comments

Comments
 (0)