Skip to content

[SPARK-47274][PYTHON][SQL] Provide more useful context for PySpark DataFrame API errors #45377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2c1d5d8
Prototype
itholic Mar 4, 2024
376fc46
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 1, 2024
174a929
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 2, 2024
8ab1edf
Support query context testing and added UTs
itholic Apr 2, 2024
5906852
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 3, 2024
f3a7bd4
resolve comments
itholic Apr 3, 2024
bbaa399
Add JIRA pointer for testing
itholic Apr 3, 2024
b9f54f1
Silence the linter
itholic Apr 3, 2024
c8d98ea
Adjusted comments
itholic Apr 3, 2024
ef7f1df
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 4, 2024
cc52aab
Update displayed string and add comment for PySparkCurrentOrigin
itholic Apr 5, 2024
9c323d4
Using queue to ensure multiple call sites can be logged in order and …
itholic Apr 5, 2024
f5ad1c4
remove unnecessary comment
itholic Apr 5, 2024
4f12dc7
Extends Origin and WithOrigin to PySpark context support
itholic Apr 8, 2024
001c71e
Reusing fn for PySpark logging
itholic Apr 9, 2024
daa08cd
Add document for extended PySpark specific logging functions
itholic Apr 9, 2024
92faffe
remove unused code
itholic Apr 9, 2024
2514afb
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 9, 2024
672c176
Adress None properly
itholic Apr 9, 2024
1304c2b
Simplifying
itholic Apr 9, 2024
ff4037b
Merge branch 'master' of https://github.com/apache/spark into error_c…
itholic Apr 10, 2024
1d8df34
Respect spark.sql.stackTracesInDataFrameContext
itholic Apr 10, 2024
95f7848
Add captureStackTrace to remove duplication
itholic Apr 10, 2024
1dd53ed
pysparkLoggingInfo -> pysparkErrorContext
itholic Apr 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/pyspark/errors/exceptions/captured.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,5 +409,13 @@ def fragment(self) -> str:
def callSite(self) -> str:
return str(self._q.callSite())

def pysparkFragment(self) -> Optional[str]: # type: ignore[return]
if self.contextType() == QueryContextType.DataFrame:
return str(self._q.pysparkFragment())

def pysparkCallSite(self) -> Optional[str]: # type: ignore[return]
if self.contextType() == QueryContextType.DataFrame:
return str(self._q.pysparkCallSite())

def summary(self) -> str:
return str(self._q.summary())
37 changes: 36 additions & 1 deletion python/pyspark/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sys
import json
import warnings
import inspect
from typing import (
cast,
overload,
Expand Down Expand Up @@ -174,16 +175,50 @@ def _bin_op(
["Column", Union["Column", "LiteralType", "DecimalLiteral", "DateTimeLiteral"]], "Column"
]:
"""Create a method for given binary operator"""
binary_operator_map = {
"plus": "+",
"minus": "-",
"divide": "/",
"multiply": "*",
"mod": "%",
"equalTo": "=",
"lt": "<",
"leq": "<=",
"geq": ">=",
"gt": ">",
"eqNullSafe": "<=>",
"bitwiseOR": "|",
"bitwiseAND": "&",
"bitwiseXOR": "^",
# Just following JVM rule even if the names of source and target are the same.
"and": "and",
"or": "or",
}

def _(
self: "Column",
other: Union["Column", "LiteralType", "DecimalLiteral", "DateTimeLiteral"],
) -> "Column":
jc = other._jc if isinstance(other, Column) else other
njc = getattr(self._jc, name)(jc)
if name in binary_operator_map:
from pyspark.sql import SparkSession

spark = SparkSession._getActiveSessionOrCreate()
stack = list(reversed(inspect.stack()))
depth = int(
spark.conf.get("spark.sql.stackTracesInDataFrameContext") # type: ignore[arg-type]
)
selected_frames = stack[:depth]
call_sites = [f"{frame.filename}:{frame.lineno}" for frame in selected_frames]
call_site_str = "\n".join(call_sites)

njc = getattr(self._jc, "fn")(binary_operator_map[name], jc, name, call_site_str)
else:
njc = getattr(self._jc, name)(jc)
return Column(njc)

_.__doc__ = doc
_.__name__ = name
return _


Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def test_help_command(self):
def test_toDF_with_schema_string(self):
super().test_toDF_with_schema_string()

@unittest.skip("Spark Connect does not support DataFrameQueryContext currently.")
def test_dataframe_error_context(self):
super().test_dataframe_error_context()


if __name__ == "__main__":
import unittest
Expand Down
Loading