Skip to content

Commit 7c6d2d5

Browse files
committed
address comments
1 parent f2949f1 commit 7c6d2d5

File tree

3 files changed

+48
-8
lines changed

3 files changed

+48
-8
lines changed

python/pyspark/sql/functions.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2714,20 +2714,22 @@ def from_csv(col, schema, options={}):
27142714

27152715

27162716
@since(3.0)
2717-
def getActiveSession():
2717+
def _getActiveSession():
27182718
"""
27192719
Returns the active SparkSession for the current thread
2720+
This method is not intended for user to call directly.
2721+
It is only used for getActiveSession method in session.py
27202722
"""
27212723
from pyspark.sql import SparkSession
27222724
sc = SparkContext._active_spark_context
27232725
if sc is None:
2724-
sc = SparkContext()
2725-
2726-
if sc._jvm.SparkSession.getActiveSession().isDefined():
2727-
SparkSession(sc, sc._jvm.SparkSession.getActiveSession().get())
2728-
return SparkSession._activeSession
2729-
else:
27302726
return None
2727+
else:
2728+
if sc._jvm.SparkSession.getActiveSession().isDefined():
2729+
SparkSession(sc, sc._jvm.SparkSession.getActiveSession().get())
2730+
return SparkSession._activeSession
2731+
else:
2732+
return None
27312733

27322734

27332735
# ---------------------------- User Defined Function ----------------------------------

python/pyspark/sql/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ def getActiveSession(cls):
271271
[Row(age=1)]
272272
"""
273273
from pyspark.sql import functions
274-
return functions.getActiveSession()
274+
return functions._getActiveSession()
275275

276276
@property
277277
@since(2.0)
@@ -689,6 +689,8 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
689689
...
690690
Py4JJavaError: ...
691691
"""
692+
SparkSession._activeSession = self
693+
self._jvm.SparkSession.setActiveSession(self._jsparkSession)
692694
if isinstance(data, DataFrame):
693695
raise TypeError("data is already a DataFrame")
694696

python/pyspark/sql/tests.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3965,6 +3965,42 @@ def test_create_new_session_if_old_session_stopped(self):
39653965
finally:
39663966
newSession.stop()
39673967

3968+
def test_active_session_with_None_and_not_None_context(self):
3969+
from pyspark.context import SparkContext
3970+
from pyspark.conf import SparkConf
3971+
sc = SparkContext._active_spark_context
3972+
self.assertEqual(sc, None)
3973+
activeSession = SparkSession.getActiveSession()
3974+
self.assertEqual(activeSession, None)
3975+
sparkConf = SparkConf()
3976+
sc = SparkContext.getOrCreate(sparkConf)
3977+
activeSession = sc._jvm.SparkSession.getActiveSession()
3978+
self.assertFalse(activeSession.isDefined())
3979+
session = SparkSession(sc)
3980+
activeSession = sc._jvm.SparkSession.getActiveSession()
3981+
self.assertTrue(activeSession.isDefined())
3982+
activeSession2 = SparkSession.getActiveSession()
3983+
self.assertNotEqual(activeSession2, None)
3984+
3985+
3986+
class SparkSessionTests3(ReusedSQLTestCase):
3987+
3988+
def test_get_active_session_after_create_dataframe(self):
3989+
activeSession1 = SparkSession.getActiveSession()
3990+
session1 = self.spark
3991+
self.assertEqual(session1, activeSession1)
3992+
session2 = self.spark.newSession()
3993+
activeSession2 = SparkSession.getActiveSession()
3994+
self.assertEqual(session1, activeSession2)
3995+
self.assertNotEqual(session2, activeSession2)
3996+
session2.createDataFrame([(1, 'Alice')], ['age', 'name'])
3997+
activeSession3 = SparkSession.getActiveSession()
3998+
self.assertEqual(session2, activeSession3)
3999+
session1.createDataFrame([(1, 'Alice')], ['age', 'name'])
4000+
activeSession4 = SparkSession.getActiveSession()
4001+
self.assertEqual(session1, activeSession4)
4002+
session2.stop()
4003+
39684004

39694005
class UDFInitializationTests(unittest.TestCase):
39704006
def tearDown(self):

0 commit comments

Comments
 (0)