-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-48459][CONNECT][PYTHON] Implement DataFrameQueryContext in Spark Connect #46789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils | |
import org.json4s.JsonDSL._ | ||
import org.json4s.jackson.JsonMethods | ||
|
||
import org.apache.spark.{SparkEnv, SparkException, SparkThrowable} | ||
import org.apache.spark.{QueryContextType, SparkEnv, SparkException, SparkThrowable} | ||
import org.apache.spark.api.python.PythonException | ||
import org.apache.spark.connect.proto.FetchErrorDetailsResponse | ||
import org.apache.spark.internal.{Logging, MDC} | ||
|
@@ -118,15 +118,27 @@ private[connect] object ErrorUtils extends Logging { | |
sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass) | ||
} | ||
for (queryCtx <- sparkThrowable.getQueryContext) { | ||
sparkThrowableBuilder.addQueryContexts( | ||
FetchErrorDetailsResponse.QueryContext | ||
.newBuilder() | ||
val builder = FetchErrorDetailsResponse.QueryContext | ||
.newBuilder() | ||
val context = if (queryCtx.contextType() == QueryContextType.SQL) { | ||
builder | ||
.setContextType(FetchErrorDetailsResponse.QueryContext.ContextType.SQL) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did we never set this before? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah .. so it has been always |
||
.setObjectType(queryCtx.objectType()) | ||
.setObjectName(queryCtx.objectName()) | ||
.setStartIndex(queryCtx.startIndex()) | ||
.setStopIndex(queryCtx.stopIndex()) | ||
.setFragment(queryCtx.fragment()) | ||
.build()) | ||
.setSummary(queryCtx.summary()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, so we did not have |
||
.build() | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really an unconditional else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now, yes because we only have |
||
builder | ||
.setContextType(FetchErrorDetailsResponse.QueryContext.ContextType.DATAFRAME) | ||
.setFragment(queryCtx.fragment()) | ||
.setCallSite(queryCtx.callSite()) | ||
.setSummary(queryCtx.summary()) | ||
.build() | ||
} | ||
sparkThrowableBuilder.addQueryContexts(context) | ||
} | ||
if (sparkThrowable.getSqlState != null) { | ||
sparkThrowableBuilder.setSqlState(sparkThrowable.getSqlState) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -166,7 +166,14 @@ def getQueryContext(self) -> List[BaseQueryContext]: | |
if self._origin is not None and is_instance_of( | ||
gw, self._origin, "org.apache.spark.SparkThrowable" | ||
): | ||
return [QueryContext(q) for q in self._origin.getQueryContext()] | ||
contexts: List[BaseQueryContext] = [] | ||
for q in self._origin.getQueryContext(): | ||
if q.contextType().toString() == "SQL": | ||
contexts.append(SQLQueryContext(q)) | ||
else: | ||
contexts.append(DataFrameQueryContext(q)) | ||
|
||
return contexts | ||
else: | ||
return [] | ||
|
||
|
@@ -379,17 +386,12 @@ class UnknownException(CapturedException, BaseUnknownException): | |
""" | ||
|
||
|
||
class QueryContext(BaseQueryContext): | ||
class SQLQueryContext(BaseQueryContext): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we consider this a private / developer API? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only parent class |
||
def __init__(self, q: "JavaObject"): | ||
self._q = q | ||
|
||
def contextType(self) -> QueryContextType: | ||
context_type = self._q.contextType().toString() | ||
assert context_type in ("SQL", "DataFrame") | ||
if context_type == "DataFrame": | ||
return QueryContextType.DataFrame | ||
else: | ||
return QueryContextType.SQL | ||
return QueryContextType.SQL | ||
|
||
def objectType(self) -> str: | ||
return str(self._q.objectType()) | ||
|
@@ -409,13 +411,34 @@ def fragment(self) -> str: | |
def callSite(self) -> str: | ||
return str(self._q.callSite()) | ||
|
||
def pysparkFragment(self) -> Optional[str]: # type: ignore[return] | ||
if self.contextType() == QueryContextType.DataFrame: | ||
return str(self._q.pysparkFragment()) | ||
def summary(self) -> str: | ||
return str(self._q.summary()) | ||
|
||
|
||
class DataFrameQueryContext(BaseQueryContext): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't the type annotation wrong here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see this is for the classic side. |
||
def __init__(self, q: "JavaObject"): | ||
self._q = q | ||
|
||
def contextType(self) -> QueryContextType: | ||
return QueryContextType.DataFrame | ||
|
||
def objectType(self) -> str: | ||
return str(self._q.objectType()) | ||
|
||
def objectName(self) -> str: | ||
return str(self._q.objectName()) | ||
|
||
def pysparkCallSite(self) -> Optional[str]: # type: ignore[return] | ||
if self.contextType() == QueryContextType.DataFrame: | ||
return str(self._q.pysparkCallSite()) | ||
def startIndex(self) -> int: | ||
return int(self._q.startIndex()) | ||
|
||
def stopIndex(self) -> int: | ||
return int(self._q.stopIndex()) | ||
|
||
def fragment(self) -> str: | ||
return str(self._q.fragment()) | ||
|
||
def callSite(self) -> str: | ||
return str(self._q.callSite()) | ||
|
||
def summary(self) -> str: | ||
return str(self._q.summary()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we over-engineering? what else can be put here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's same as
Relation.RelationCommon
so it's more for consistency (so we can reuseOrigin
as well for call site). I think it's fine.