Skip to content

[SPARK-28556][SQL] QueryExecutionListener should also notify Error #25292

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,11 @@ object MimaExcludes {

// [SPARK-28199][SS] Remove deprecated ProcessingTime
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$"),

// [SPARK-28556][SQL] QueryExecutionListener should also notify Error
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object SQLExecution {
}.getOrElse(callSite.shortForm)

withSQLConfPropagated(sparkSession) {
var ex: Option[Exception] = None
var ex: Option[Throwable] = None
val startTime = System.nanoTime()
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
Expand All @@ -99,7 +99,7 @@ object SQLExecution {
time = System.currentTimeMillis()))
body
} catch {
case e: Exception =>
case e: Throwable =>
ex = Some(e)
throw e
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
@JsonIgnore private[sql] var qe: QueryExecution = null

// The exception object that caused this execution to fail. None if the execution doesn't fail.
@JsonIgnore private[sql] var executionFailure: Option[Exception] = None
@JsonIgnore private[sql] var executionFailure: Option[Throwable] = None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ trait QueryExecutionListener {
* @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param exception the exception that failed this query.
* @param error the error that failed this query.
*
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need a release note.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another issue in this API: It passes a private class QueryExecution to the user in a public API. Didn't fix it as it will require a re-design of this API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, we should sort those problems out. There's another occurrence at Dataset.queryExecution as an API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about this change. This broke one of my project that uses this API. Instead of changing the signature here, did we consider just wrapping the exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE: QueryExecution I think that has become a developer API at this point. I think a lot of developers use it to debug things.

Copy link
Member Author

@zsxwing zsxwing Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping the exception?

This is a good suggestion. This can avoid api changes, and we can also fix this bug that errors are not sent to the listener in 2.4.

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class SessionStateSuite extends SparkFunSuite {
test("fork new session and inherit listener manager") {
class CommandCollector extends QueryExecutionListener {
val commands: ArrayBuffer[String] = ArrayBuffer.empty[String]
override def onFailure(funcName: String, qe: QueryExecution, ex: Exception) : Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable) : Unit = {}
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
commands += funcName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class TestQueryExecutionListener extends QueryExecutionListener {
OnSuccessCall.isOnSuccessCalled.set(true)
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { }
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = { }
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ class UDFSuite extends QueryTest with SharedSQLContext {
withTempPath { path =>
var numTotalCachedHit = 0
val listener = new QueryExecutionListener {
override def onFailure(f: String, qe: QueryExecution, e: Exception): Unit = {}
override def onFailure(f: String, qe: QueryExecution, e: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
qe.withCachedData match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSQLContext {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> format,
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> format) {
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
val exceptions = ArrayBuffer.empty[(String, Exception)]
val errors = ArrayBuffer.empty[(String, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception): Unit = {
exceptions += funcName -> exception
error: Throwable): Unit = {
errors += funcName -> error
}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
plan = qe.analyzed

}
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}
}

spark.listenerManager.register(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
metrics += ((funcName, qe, duration))
Expand All @@ -63,10 +63,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
}

testQuietly("execute callback functions when a DataFrame action failed") {
val metrics = ArrayBuffer.empty[(String, QueryExecution, Exception)]
val metrics = ArrayBuffer.empty[(String, QueryExecution, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
metrics += ((funcName, qe, exception))
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
metrics += ((funcName, qe, error))
}

// Only test failed case here, so no need to implement `onSuccess`
Expand All @@ -92,7 +92,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[Long]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
val metric = qe.executedPlan match {
Expand Down Expand Up @@ -132,7 +132,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
val metrics = ArrayBuffer.empty[Long]
val listener = new QueryExecutionListener {
// Only test successful case here, so no need to implement `onFailure`
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {}
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
metrics += qe.executedPlan.longMetric("dataSize").value
Expand Down Expand Up @@ -172,10 +172,10 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {

test("execute callback functions for DataFrameWriter") {
val commands = ArrayBuffer.empty[(String, LogicalPlan)]
val exceptions = ArrayBuffer.empty[(String, Exception)]
val errors = ArrayBuffer.empty[(String, Throwable)]
val listener = new QueryExecutionListener {
override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
exceptions += funcName -> exception
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
errors += funcName -> error
}

override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
Expand Down Expand Up @@ -221,9 +221,9 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext {
spark.range(10).select($"id", $"id").write.insertInto("tab")
}
sparkContext.listenerBus.waitUntilEmpty(1000)
assert(exceptions.length == 1)
assert(exceptions.head._1 == "insertInto")
assert(exceptions.head._2 == e)
assert(errors.length == 1)
assert(errors.head._1 == "insertInto")
assert(errors.head._2 == e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private class CountingQueryExecutionListener extends QueryExecutionListener {
CALLBACK_COUNT.incrementAndGet()
}

override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
override def onFailure(funcName: String, qe: QueryExecution, error: Throwable): Unit = {
CALLBACK_COUNT.incrementAndGet()
}

Expand Down