Skip to content

Commit a6bf4cd

Browse files
committed
added callsite info for context.py
1 parent 0e40e2b commit a6bf4cd

File tree

2 files changed

+25
-8
lines changed

2 files changed

+25
-8
lines changed

python/pyspark/context.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from pyspark.java_gateway import launch_gateway
3030
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
3131
from pyspark.storagelevel import StorageLevel
32+
from pyspark import rdd
3233
from pyspark.rdd import RDD
3334

3435
from py4j.java_collections import ListConverter
@@ -83,6 +84,10 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
8384
...
8485
ValueError:...
8586
"""
87+
if rdd._extract_concise_traceback() is not None:
88+
self._callsite = rdd._extract_concise_traceback()
89+
else:
90+
self._callsite = {"function": None, "file": None, "line": None}
8691
SparkContext._ensure_initialized(self, gateway=gateway)
8792

8893
self.environment = environment or {}
@@ -169,7 +174,14 @@ def _ensure_initialized(cls, instance=None, gateway=None):
169174

170175
if instance:
171176
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
172-
raise ValueError("Cannot run multiple SparkContexts at once")
177+
currentMaster = SparkContext._active_spark_context.master
178+
currentAppName = SparkContext._active_spark_context.appName
179+
callsite = SparkContext._active_spark_context._callsite
180+
181+
# Raise error if there is already a running Spark context
182+
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
183+
" created by %s at %s:%s " \
184+
% (currentAppName, currentMaster, callsite['function'], callsite['file'], callsite['line']))
173185
else:
174186
SparkContext._active_spark_context = instance
175187

python/pyspark/rdd.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@
4242
__all__ = ["RDD"]
4343

4444
def _extract_concise_traceback():
45+
"""
46+
This function returns the traceback info for a callsite, returns a dict
47+
with function name, file name and line number
48+
"""
4549
tb = traceback.extract_stack()
4650
if len(tb) == 0:
47-
return "I'm lost!"
48-
# HACK: This function is in a file called 'rdd.py' in the top level of
49-
# everything PySpark. Just trim off the directory name and assume
50-
# everything in that tree is PySpark guts.
51+
return None
5152
file, line, module, what = tb[len(tb) - 1]
5253
sparkpath = os.path.dirname(file)
5354
first_spark_frame = len(tb) - 1
@@ -58,16 +59,20 @@ def _extract_concise_traceback():
5859
break
5960
if first_spark_frame == 0:
6061
file, line, fun, what = tb[0]
61-
return "%s at %s:%d" % (fun, file, line)
62+
return {"function": fun, "file": file, "line": line}
6263
sfile, sline, sfun, swhat = tb[first_spark_frame]
6364
ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
64-
return "%s at %s:%d" % (sfun, ufile, uline)
65+
return {"function": sfun, "file": ufile, "line": uline}
6566

6667
_spark_stack_depth = 0
6768

6869
class _JavaStackTrace(object):
6970
def __init__(self, sc):
70-
self._traceback = _extract_concise_traceback()
71+
tb = _extract_concise_traceback()
72+
if tb is not None:
73+
self._traceback = "%s at %s:%s" % (tb["function"], tb["file"], tb["line"])
74+
else:
75+
self._traceback = "Error! Could not extract traceback info"
7176
self._context = sc
7277

7378
def __enter__(self):

0 commit comments

Comments
 (0)