Skip to content

Commit 9a14d74

Browse files
LuciferYangilicmarkodb
authored andcommitted
[SPARK-48974][SQL][SS][ML][MLLIB] Use SparkSession.implicits instead of SQLContext.implicits
### What changes were proposed in this pull request? This PR replaces `SQLContext.implicits` with `SparkSession.implicits` in the Spark codebase. ### Why are the changes needed? Reduce the usage of code from `SQLContext` within the internal code of Spark. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47457 from LuciferYang/use-sparksession-implicits. Lead-authored-by: yangjie01 <yangjie01@baidu.com> Co-authored-by: YangJie <yangjie01@baidu.com> Signed-off-by: yangjie01 <yangjie01@baidu.com>
1 parent f15922c commit 9a14d74

File tree

16 files changed

+28
-28
lines changed

16 files changed

+28
-28
lines changed

mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ object MLUtils extends Logging {
119119
).resolveRelation(checkFilesExist = false))
120120
.select("value")
121121

122-
import lines.sqlContext.implicits._
122+
import lines.sparkSession.implicits._
123123

124124
lines.select(trim($"value").as("line"))
125125
.filter(not((length($"line") === 0).or($"line".startsWith("#"))))

mllib/src/test/scala/org/apache/spark/ml/classification/FMClassifierSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ class FMClassifierSuite extends MLTest with DefaultReadWriteTest {
5252
}
5353

5454
test("FMClassifier: Predictor, Classifier methods") {
55-
val sqlContext = smallBinaryDataset.sqlContext
56-
import sqlContext.implicits._
55+
val session = smallBinaryDataset.sparkSession
56+
import session.implicits._
5757
val fm = new FMClassifier()
5858

5959
val model = fm.fit(smallBinaryDataset)

mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -550,8 +550,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
550550
}
551551

552552
test("multinomial logistic regression: Predictor, Classifier methods") {
553-
val sqlContext = smallMultinomialDataset.sqlContext
554-
import sqlContext.implicits._
553+
val session = smallMultinomialDataset.sparkSession
554+
import session.implicits._
555555
val mlr = new LogisticRegression().setFamily("multinomial")
556556

557557
val model = mlr.fit(smallMultinomialDataset)
@@ -590,8 +590,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
590590
}
591591

592592
test("binary logistic regression: Predictor, Classifier methods") {
593-
val sqlContext = smallBinaryDataset.sqlContext
594-
import sqlContext.implicits._
593+
val session = smallBinaryDataset.sparkSession
594+
import session.implicits._
595595
val lr = new LogisticRegression().setFamily("binomial")
596596

597597
val model = lr.fit(smallBinaryDataset)
@@ -1427,8 +1427,8 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest {
14271427
val trainer2 = (new LogisticRegression).setFitIntercept(true).setWeightCol("weight")
14281428
.setElasticNetParam(1.0).setRegParam(6.0).setStandardization(false)
14291429

1430-
val sqlContext = multinomialDataset.sqlContext
1431-
import sqlContext.implicits._
1430+
val session = multinomialDataset.sparkSession
1431+
import session.implicits._
14321432
val model1 = trainer1.fit(multinomialDataset)
14331433
val model2 = trainer2.fit(multinomialDataset)
14341434

mllib/src/test/scala/org/apache/spark/ml/recommendation/CollectTopKSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ class CollectTopKSuite extends MLTest {
2929

3030
override def beforeAll(): Unit = {
3131
super.beforeAll()
32-
val sqlContext = spark.sqlContext
33-
import sqlContext.implicits._
32+
val session = spark
33+
import session.implicits._
3434
dataFrame = Seq(
3535
(0, 3, 54f),
3636
(0, 4, 44f),

mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,8 +962,8 @@ class LinearRegressionSuite extends MLTest with DefaultReadWriteTest with PMMLRe
962962
}
963963

964964
test("linear regression with weighted samples") {
965-
val sqlContext = spark.sqlContext
966-
import sqlContext.implicits._
965+
val session = spark
966+
import session.implicits._
967967
val numClasses = 0
968968
def modelEquals(m1: LinearRegressionModel, m2: LinearRegressionModel): Unit = {
969969
assert(m1.coefficients ~== m2.coefficients relTol 0.01)

mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ object MLTestingUtils extends SparkFunSuite {
220220
numClasses: Int,
221221
modelEquals: (M, M) => Unit,
222222
outlierRatio: Int): Unit = {
223-
import data.sqlContext.implicits._
223+
import data.sparkSession.implicits._
224224
val outlierDS = data.withColumn("weight", lit(1.0)).as[Instance].flatMap {
225225
case Instance(l, w, f) =>
226226
val outlierLabel = if (numClasses == 0) -l else numClasses - l - 1

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ object CSVUtils {
3232
// Note that this was separately made by SPARK-18362. Logically, this should be the same
3333
// with the one below, `filterCommentAndEmpty` but execution path is different. One of them
3434
// might have to be removed in the near future if possible.
35-
import lines.sqlContext.implicits._
35+
import lines.sparkSession.implicits._
3636
val aliased = lines.toDF("value")
3737
val nonEmptyLines = aliased.filter(length(trim($"value")) > 0)
3838
if (options.isCommentSet) {

sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
183183
.contains(MyQueryStagePrepRule()))
184184
assert(session.sessionState.columnarRules.contains(
185185
MyColumnarRule(MyNewQueryStageRule(), MyNewQueryStageRule())))
186-
import session.sqlContext.implicits._
186+
import session.implicits._
187187
val data = Seq((100L), (200L), (300L)).toDF("vals").repartition(1)
188188
val df = data.selectExpr("vals + 1")
189189
df.collect()
@@ -225,7 +225,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
225225
session.conf.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
226226
assert(session.sessionState.columnarRules.contains(
227227
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
228-
import session.sqlContext.implicits._
228+
import session.implicits._
229229
// perform a join to inject a shuffle exchange
230230
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
231231
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
@@ -283,7 +283,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
283283
session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
284284
assert(session.sessionState.columnarRules.contains(
285285
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
286-
import session.sqlContext.implicits._
286+
import session.implicits._
287287
// perform a join to inject a broadcast exchange
288288
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
289289
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
@@ -327,7 +327,7 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper with Adapt
327327
try {
328328
assert(session.sessionState.columnarRules.contains(
329329
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
330-
import session.sqlContext.implicits._
330+
import session.implicits._
331331

332332
val input = Seq((100L), (200L), (300L))
333333
val data = input.toDF("vals").repartition(1)

sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
4949
override def schema: StructType = fakeSchema
5050
override def getOffset: Option[Offset] = Some(new LongOffset(0))
5151
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
52-
import spark.implicits._
52+
import spark.sparkSession.implicits._
5353
Seq[Int]().toDS().toDF()
5454
}
5555
override def stop(): Unit = {}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextCompatibilitySuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite {
5454

5555
test("basic operations") {
5656
val _hc = hc
57-
import _hc.implicits._
57+
import _hc.sparkSession.implicits._
5858
val df1 = (1 to 20).map { i => (i, i) }.toDF("a", "x")
5959
val df2 = (1 to 100).map { i => (i, i % 10, i % 2 == 0) }.toDF("a", "b", "c")
6060
.select($"a", $"b")
@@ -71,7 +71,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite {
7171

7272
test("basic DDLs") {
7373
val _hc = hc
74-
import _hc.implicits._
74+
import _hc.sparkSession.implicits._
7575
val databases = hc.sql("SHOW DATABASES").collect().map(_.getString(0))
7676
assert(databases.toSeq == Seq("default"))
7777
hc.sql("CREATE DATABASE mee_db")

0 commit comments

Comments
 (0)