Skip to content

[SPARK-45516][CONNECT] Include QueryContext in SparkThrowable proto message #43352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
assert(!ex.messageParameters.isEmpty)
assert(ex.getSqlState != null)
assert(!ex.isInternalError)
assert(ex.getQueryContext.length == 1)
assert(ex.getQueryContext.head.startIndex() == 7)
assert(ex.getQueryContext.head.stopIndex() == 7)
assert(ex.getQueryContext.head.fragment() == "x")
assert(
ex.getStackTrace
.find(_.getClassName.contains("org.apache.spark.sql.catalyst.analysis.CheckAnalysis"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,13 +819,39 @@ message FetchErrorDetailsResponse {
int32 line_number = 4;
}

// QueryContext defines the schema for the query context of a SparkThrowable.
// It helps users understand where the error occurs while executing queries.
message QueryContext {
// The object type of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
string object_type = 1;

// The object name of the query which throws the exception.
// If the exception is directly from the main query, it should be an empty string.
// Otherwise, it should be the object name. For example, a view name "V1".
string object_name = 2;

// The starting index in the query text which throws the exception. The index starts from 0.
int32 start_index = 3;

// The stopping index in the query which throws the exception. The index starts from 0.
int32 stop_index = 4;

// The corresponding fragment of the query which throws the exception.
string fragment = 5;
}

// SparkThrowable defines the schema for SparkThrowable exceptions.
message SparkThrowable {
// Succinct, human-readable, unique, and consistent representation of the error category.
optional string error_class = 1;

// message parameters for the error framework.
// The message parameters for the error framework.
map<string, string> message_parameters = 2;

// The query context of a SparkThrowable.
repeated QueryContext query_contexts = 3;
}

// Error defines the schema for the representing exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.grpc.protobuf.StatusProto
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods

import org.apache.spark.{SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.{QueryContext, SparkArithmeticException, SparkArrayIndexOutOfBoundsException, SparkDateTimeException, SparkException, SparkIllegalArgumentException, SparkNumberFormatException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.connect.proto.{FetchErrorDetailsRequest, FetchErrorDetailsResponse, UserContext}
import org.apache.spark.connect.proto.SparkConnectServiceGrpc.SparkConnectServiceBlockingStub
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -167,10 +167,12 @@ private object GrpcExceptionConverter {
private case class ErrorParams(
message: String,
cause: Option[Throwable],
// errorClass will only be set if the error is enriched and SparkThrowable.
// errorClass will only be set if the error is both enriched and SparkThrowable.
errorClass: Option[String],
// messageParameters will only be set if the error is enriched and SparkThrowable.
messageParameters: Map[String, String])
// messageParameters will only be set if the error is both enriched and SparkThrowable.
messageParameters: Map[String, String],
// queryContext will only be set if the error is both enriched and SparkThrowable.
queryContext: Array[QueryContext])

private def errorConstructor[T <: Throwable: ClassTag](
throwableCtr: ErrorParams => T): (String, ErrorParams => Throwable) = {
Expand All @@ -192,13 +194,15 @@ private object GrpcExceptionConverter {
Origin(),
Origin(),
errorClass = params.errorClass,
messageParameters = params.messageParameters)),
messageParameters = params.messageParameters,
queryContext = params.queryContext)),
errorConstructor(params =>
new AnalysisException(
params.message,
cause = params.cause,
errorClass = params.errorClass,
messageParameters = params.messageParameters)),
messageParameters = params.messageParameters,
context = params.queryContext)),
errorConstructor(params => new NamespaceAlreadyExistsException(params.message)),
errorConstructor(params => new TableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new TempTableAlreadyExistsException(params.message, params.cause)),
Expand All @@ -221,7 +225,8 @@ private object GrpcExceptionConverter {
message = params.message,
cause = params.cause.orNull,
errorClass = params.errorClass,
messageParameters = params.messageParameters)))
messageParameters = params.messageParameters,
context = params.queryContext)))

/**
* errorsToThrowable reconstructs the exception based on a list of protobuf messages
Expand All @@ -247,16 +252,35 @@ private object GrpcExceptionConverter {
val causeOpt =
if (error.hasCauseIdx) Some(errorsToThrowable(error.getCauseIdx, errors)) else None

val errorClass = if (error.hasSparkThrowable && error.getSparkThrowable.hasErrorClass) {
Some(error.getSparkThrowable.getErrorClass)
} else None

val messageParameters = if (error.hasSparkThrowable) {
error.getSparkThrowable.getMessageParametersMap.asScala.toMap
} else Map.empty[String, String]

val queryContext = error.getSparkThrowable.getQueryContextsList.asScala.map { queryCtx =>
new QueryContext {
override def objectType(): String = queryCtx.getObjectType

override def objectName(): String = queryCtx.getObjectName

override def startIndex(): Int = queryCtx.getStartIndex

override def stopIndex(): Int = queryCtx.getStopIndex

override def fragment(): String = queryCtx.getFragment
}
}.toArray

val exception = constructor(
ErrorParams(
message = error.getMessage,
cause = causeOpt,
errorClass = if (error.hasSparkThrowable && error.getSparkThrowable.hasErrorClass) {
Some(error.getSparkThrowable.getErrorClass)
} else None,
messageParameters = if (error.hasSparkThrowable) {
error.getSparkThrowable.getMessageParametersMap.asScala.toMap
} else Map.empty))
errorClass = errorClass,
messageParameters = messageParameters,
queryContext = queryContext))

if (!error.getStackTraceList.isEmpty) {
exception.setStackTrace(error.getStackTraceList.asScala.toArray.map { stackTraceElement =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ private[connect] object ErrorUtils extends Logging {
if (sparkThrowable.getErrorClass != null) {
sparkThrowableBuilder.setErrorClass(sparkThrowable.getErrorClass)
}
for (queryCtx <- sparkThrowable.getQueryContext) {
sparkThrowableBuilder.addQueryContexts(
FetchErrorDetailsResponse.QueryContext
.newBuilder()
.setObjectType(queryCtx.objectType())
.setObjectName(queryCtx.objectName())
.setStartIndex(queryCtx.startIndex())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this should actually fail after #43334 PR because DataFrameQueryContext throws an exception now (https://github.com/apache/spark/pull/43334/files#diff-b3bc05fec45cd951053b2876c71c7730b63789cb4336a7537a6654c724db3241R586-R589).

Seems like we miss this information Spark Connect client sides for now so it seems working .. but we should carry this context for DataFrame operations ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk I think there's a bug not only here but without Spark Connect.

scala> try {
     |   spark.range(1).select("a")
     | } catch {
     |     case e: org.apache.spark.sql.catalyst.ExtendedAnalysisException => println(e.getQueryContext)
     | }
[Lorg.apache.spark.QueryContext;@6a9d7514
val res2: Any = ()

It doesn't contain QueryContext from #43334 ..

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, there are three issues. TL;DR:

  1. There are many places that does not provide Origin that contains QueryContext at QueryCompilationErrors. So the QueryContext is often missing (e.g., the case above). cc @MaxGekk

  2. . Origin.context is being directly used on Executor side. e.g., origin.context at DivideYMInterval. This seems wrongly returning SQLQueryContext for DataFrame operations (from Executor side) in Spark Connect server because we do not call withOrigin there. That's why this specific code did not throw an exception from https://github.com/apache/spark/pull/43334/files#diff-b3bc05fec45cd951053b2876c71c7730b63789cb4336a7537a6654c724db3241R586-R589 because it has never been DataFrameContext cc @heyihong @juliuszsompolski

  3. The current logic in ErrorUtils.scala should handle the case of DataFrameQueryContext e.g., DataFrameQueryContext.stopIndex() will throw an exception. Should we:

    • Set a default value in DataFrameQueryContext.stopIndex() instead of throwing an exception? @MaxGekk
    • Or, make the protobuf message for this optional, and throw an exception from Spark Connect client sides? @heyihong @juliuszsompolski

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should invoke withOrigin in Spark Connect server.

do you mean client? Server side stacktrace is not interesting to users to understand their own code mistakes.

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a mistake. I updated my comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either setting a default value or making the protobuf field optional should work. It depends on the semantic of DataFrameQueryContext.stopIndex().

cc: @MaxGekk

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we fix this.

.setStopIndex(queryCtx.stopIndex())
.setFragment(queryCtx.fragment())
.build())
}
sparkThrowableBuilder.putAllMessageParameters(sparkThrowable.getMessageParameters)
builder.setSparkThrowable(sparkThrowableBuilder.build())
case _ =>
Expand Down
22 changes: 12 additions & 10 deletions python/pyspark/sql/connect/proto/base_pb2.py

Large diffs are not rendered by default.

69 changes: 68 additions & 1 deletion python/pyspark/sql/connect/proto/base_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -2869,6 +2869,59 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):
self, oneof_group: typing_extensions.Literal["_file_name", b"_file_name"]
) -> typing_extensions.Literal["file_name"] | None: ...

class QueryContext(google.protobuf.message.Message):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq; could we do this in Python client too?

Copy link
Contributor Author

@heyihong heyihong Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet if I understand correctly. @gatorsmile has some concern about whether exposing QueryContext in the PySpark Exception APIs makes sense for non-sql PySpark exceptions. There is some ongoing discussion for this.

"""QueryContext defines the schema for the query context of a SparkThrowable.
It helps users understand where the error occurs while executing queries.
"""

DESCRIPTOR: google.protobuf.descriptor.Descriptor

OBJECT_TYPE_FIELD_NUMBER: builtins.int
OBJECT_NAME_FIELD_NUMBER: builtins.int
START_INDEX_FIELD_NUMBER: builtins.int
STOP_INDEX_FIELD_NUMBER: builtins.int
FRAGMENT_FIELD_NUMBER: builtins.int
object_type: builtins.str
"""The object type of the query which throws the exception.
If the exception is directly from the main query, it should be an empty string.
Otherwise, it should be the exact object type in upper case. For example, a "VIEW".
"""
object_name: builtins.str
"""The object name of the query which throws the exception.
If the exception is directly from the main query, it should be an empty string.
Otherwise, it should be the object name. For example, a view name "V1".
"""
start_index: builtins.int
"""The starting index in the query text which throws the exception. The index starts from 0."""
stop_index: builtins.int
"""The stopping index in the query which throws the exception. The index starts from 0."""
fragment: builtins.str
"""The corresponding fragment of the query which throws the exception."""
def __init__(
self,
*,
object_type: builtins.str = ...,
object_name: builtins.str = ...,
start_index: builtins.int = ...,
stop_index: builtins.int = ...,
fragment: builtins.str = ...,
) -> None: ...
def ClearField(
self,
field_name: typing_extensions.Literal[
"fragment",
b"fragment",
"object_name",
b"object_name",
"object_type",
b"object_type",
"start_index",
b"start_index",
"stop_index",
b"stop_index",
],
) -> None: ...

class SparkThrowable(google.protobuf.message.Message):
"""SparkThrowable defines the schema for SparkThrowable exceptions."""

Expand All @@ -2893,18 +2946,30 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):

ERROR_CLASS_FIELD_NUMBER: builtins.int
MESSAGE_PARAMETERS_FIELD_NUMBER: builtins.int
QUERY_CONTEXTS_FIELD_NUMBER: builtins.int
error_class: builtins.str
"""Succinct, human-readable, unique, and consistent representation of the error category."""
@property
def message_parameters(
self,
) -> google.protobuf.internal.containers.ScalarMap[builtins.str, builtins.str]:
"""message parameters for the error framework."""
"""The message parameters for the error framework."""
@property
def query_contexts(
self,
) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[
global___FetchErrorDetailsResponse.QueryContext
]:
"""The query context of a SparkThrowable."""
def __init__(
self,
*,
error_class: builtins.str | None = ...,
message_parameters: collections.abc.Mapping[builtins.str, builtins.str] | None = ...,
query_contexts: collections.abc.Iterable[
global___FetchErrorDetailsResponse.QueryContext
]
| None = ...,
) -> None: ...
def HasField(
self,
Expand All @@ -2921,6 +2986,8 @@ class FetchErrorDetailsResponse(google.protobuf.message.Message):
b"error_class",
"message_parameters",
b"message_parameters",
"query_contexts",
b"query_contexts",
],
) -> None: ...
def WhichOneof(
Expand Down