Skip to content

[SPARK-972] Added detailed callsite info for ValueError in context.py (resubmitted) #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
from collections import namedtuple

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand All @@ -29,6 +30,7 @@
from pyspark.java_gateway import launch_gateway
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD

from py4j.java_collections import ListConverter
Expand Down Expand Up @@ -83,6 +85,11 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
...
ValueError:...
"""
if rdd._extract_concise_traceback() is not None:
self._callsite = rdd._extract_concise_traceback()
else:
tempNamedTuple = namedtuple("Callsite", "function file linenum")
self._callsite = tempNamedTuple(function=None, file=None, linenum=None)
SparkContext._ensure_initialized(self, gateway=gateway)

self.environment = environment or {}
Expand Down Expand Up @@ -169,7 +176,14 @@ def _ensure_initialized(cls, instance=None, gateway=None):

if instance:
if SparkContext._active_spark_context and SparkContext._active_spark_context != instance:
raise ValueError("Cannot run multiple SparkContexts at once")
currentMaster = SparkContext._active_spark_context.master
currentAppName = SparkContext._active_spark_context.appName
callsite = SparkContext._active_spark_context._callsite

# Raise error if there is already a running Spark context
raise ValueError("Cannot run multiple SparkContexts at once; existing SparkContext(app=%s, master=%s)" \
" created by %s at %s:%s " \
% (currentAppName, currentMaster, callsite.function, callsite.file, callsite.linenum))
else:
SparkContext._active_spark_context = instance

Expand Down
21 changes: 14 additions & 7 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from base64 import standard_b64encode as b64enc
import copy
from collections import defaultdict
from collections import namedtuple
from itertools import chain, ifilter, imap
import operator
import os
Expand All @@ -42,12 +43,14 @@
__all__ = ["RDD"]

def _extract_concise_traceback():
"""
This function returns the traceback info for a callsite, returns a dict
with function name, file name and line number
"""
tb = traceback.extract_stack()
callsite = namedtuple("Callsite", "function file linenum")
if len(tb) == 0:
return "I'm lost!"
# HACK: This function is in a file called 'rdd.py' in the top level of
# everything PySpark. Just trim off the directory name and assume
# everything in that tree is PySpark guts.
return None
file, line, module, what = tb[len(tb) - 1]
sparkpath = os.path.dirname(file)
first_spark_frame = len(tb) - 1
Expand All @@ -58,16 +61,20 @@ def _extract_concise_traceback():
break
if first_spark_frame == 0:
file, line, fun, what = tb[0]
return "%s at %s:%d" % (fun, file, line)
return callsite(function=fun, file=file, linenum=line)
sfile, sline, sfun, swhat = tb[first_spark_frame]
ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
return "%s at %s:%d" % (sfun, ufile, uline)
return callsite(function=sfun, file=ufile, linenum=uline)

_spark_stack_depth = 0

class _JavaStackTrace(object):
def __init__(self, sc):
self._traceback = _extract_concise_traceback()
tb = _extract_concise_traceback()
if tb is not None:
self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
else:
self._traceback = "Error! Could not extract traceback info"
self._context = sc

def __enter__(self):
Expand Down