Skip to content

[SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for … #16664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1300,10 +1300,28 @@ Configuration of in-memory caching can be done using the `setConf` method on `Sp

</table>

## QueryExecutionListener Options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a completely unrelated change to the bug fix.

Use this configuration option to attach query execution listeners

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.sql.queryExecutionListeners</code></td>
<td></td>
<td>
A comma-separated list of classes that implement QueryExecutionListener. When creating a SparkSession,
instances of these listeners will be added to it. These classes needs to have a zero-argument
constructor. If the specified class can't be found or the class specified doesn't have a valid
constructor the SparkSession creation will fail with an exception.
</td>
</tr>
</table>

## Other Configuration Options

The following options can also be used to tune the performance of query execution. It is possible
that these options will be deprecated in future release as more optimizations are performed automatically.
that these options will be deprecated in future release as more optimizations are performed
automatically.

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
Expand Down
8 changes: 7 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,13 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"),

// [SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onSuccess"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure")
)
}

Expand Down
52 changes: 46 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.{OutputParams}

/**
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -189,6 +192,32 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
this
}

/**
* Wrap a DataFrameWriter action to track the query execution and time cost, then report to the
* user-registered callback functions.
*
* @param funcName A identifier for the method executing the query
* @param qe the @see `QueryExecution` object associated with the query
* @param outputParams The output parameters useful for query analysis
* @param action the function that executes the query after which the listener methods gets
* called.
*/
private def withAction(
funcName: String,
qe: QueryExecution,
outputParams: OutputParams)(action: => Unit) = {
try {
val start = System.nanoTime()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataset.withAction will reset metrics of physical plans, shall we do it here? And can we create a general function for both Dataset and DataFrameWriter?

action
val end = System.nanoTime()
df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some(outputParams))
} catch {
case e: Exception =>
df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some(outputParams))
throw e
}
}

/**
* Saves the content of the `DataFrame` at the specified path.
*
Expand Down Expand Up @@ -218,7 +247,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
bucketSpec = getBucketSpec,
options = extraOptions.toMap)

dataSource.write(mode, df)
val destination = source match {
case "jdbc" => extraOptions.get(JDBCOptions.JDBC_TABLE_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JDBC, the source value might not be jdbc. For example, jDbC, JDBC, org.apache.spark.sql.jdbc.DefaultSource, org.apache.spark.sql.jdbc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please give me some more info. Looking at the DataFrameWriter#jdbc method it sets the source as "jdbc". Are there other places that this source is being set?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example,

    df.write.format("org.apache.spark.sql.jdbc")
    .options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
    .save()

case _ => extraOptions.get("path")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the external data source connectors, it might not have path.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for methods like saveAsTable() there is no path. Do you see a issue here?

Copy link
Member

@gatorsmile gatorsmile Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputParams The output parameters in case the method is invoked as a result of a write operation.

It sounds like OutputParams is designed for the write path. It is being used for description? Could we make it more general? For example, using a Map[String, String] like data structure? In the future, we can easily use/extend it for the other code paths?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make it more general? For example, using a Map[String, String]

Being the person who requested this class instead of an opaque map, I think using an opaque map makes for a really bad user API. The listener now needs to know about "magic keys" that have special meaning, which can vary depending on the destination. So you end up making up some contract that certain keys have some special meanings an all sources need to use them that way, so basically you end up encoding this class in a map.

That being said I'm not super happy with the way JDBC works, because there's still some information embedded in the map. I thought about it a little but didn't come up with a good solution; embedding the table name in the JDBC URI sounds hacky and brittle. Best one I got is a separate field in this class (e.g. serverUri) that can be used to identify the server that is hosting the destination value (not needed for FS-based destinations since it's in the URI, but could be useful in other cases - maybe other table-based systems like Kudu or HBase).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to make it more general instead of introducing a class for the write path only.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. calling the save method adds a "path" key to the option map, but is that key name a public API?

yes, it is. e.g. df.write.format("parquet").option("path", some_path).save(), the path is a "magic key" and we've exposed it to users, so path is a public API and if we change it, we will break existing applications.

Copy link
Contributor

@cloud-fan cloud-fan Feb 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all the "magic keys" in the options used by DataFrameWriter are public APIs, they are not going to change and users need to know about them if they wanna fine-grained control to the write operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually all the "magic keys" in the options used by DataFrameWriter are public APIs

That's good to know, but they only seem to be, at best, indirectly documented. The DataFrameWriter API doesn't say anything about the keys used by any of the methods, and sql-programming-guide.md only touches on a handful of them; for example, none of the JDBC keys are documented.

If you want to introduce an external public interface, we need a careful design. This should be done in a separate PR.

I agree that it needs a careful design and the current one doesn't cover all the options. But this PR is of very marginal value without this information being exposed in some way. If you guys feel strongly that it should be a map and that's it, I guess it will be hard to argue. Then we'll have to do that and document all the keys used internally by Spark and make them public, and promise ourselves that we'll never break them.

My belief is that a more structured type would help here. Since the current code is obviously not enough, we could have something that's more future-proof, like:

// Generic, just exposes the raw options, no stability guarantee past what SQL API provides.
class QueryExecutionParams(options: Map[])

// For FS-based sources
class FsOutputParams(dataSourceType: String, path: String, options: Map[]) extends QueryExecutionParams

// For JDBC
class JdbcOutputParams(table: String, url: String, options: Map[]) extends QueryExecutionParams

// Add others that are interesting.

Then listeners can easily handle future params by matching and handling the generic params.

Anyway, my opinion is that a raw map is not a very good API, regardless of API stability; it's hard to use and easy to break. But I'll defer to you guys if you really don't like my suggestions.

Copy link
Member

@gatorsmile gatorsmile Feb 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes those are public APIs.

}
val outputParams = OutputParams(source, destination, extraOptions.toMap)
withAction("save", df.queryExecution, outputParams) {
dataSource.write(mode, df)
}
}

/**
Expand Down Expand Up @@ -261,13 +297,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
)
}

df.sparkSession.sessionState.executePlan(
val qe = df.sparkSession.sessionState.executePlan(
InsertIntoTable(
table = UnresolvedRelation(tableIdent),
partition = Map.empty[String, Option[String]],
child = df.logicalPlan,
overwrite = mode == SaveMode.Overwrite,
ifNotExists = false)).toRdd
ifNotExists = false))
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
withAction("insertInto", qe, outputParams)(qe.toRdd)
}

private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols =>
Expand Down Expand Up @@ -324,7 +362,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {

private def assertNotPartitioned(operation: String): Unit = {
if (partitioningColumns.isDefined) {
throw new AnalysisException( s"'$operation' does not support partitioning")
throw new AnalysisException(s"'$operation' does not support partitioning")
}
}

Expand Down Expand Up @@ -428,8 +466,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
partitionColumnNames = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec
)
df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
val qe = df.sparkSession.sessionState.executePlan(
CreateTable(tableDesc, mode, Some(df.logicalPlan)))
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap)
withAction("saveAsTable", qe, outputParams)(qe.toRdd)
}

/**
Expand Down
13 changes: 11 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
import org.apache.spark.sql.internal._
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types.{DataType, LongType, StructType}
import org.apache.spark.sql.util.ExecutionListenerManager
import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener}
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -876,6 +876,9 @@ object SparkSession {
}
session = new SparkSession(sparkContext)
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
for (qeListener <- createQueryExecutionListeners(session.sparkContext.getConf)) {
session.listenerManager.register(qeListener)
}
defaultSession.set(session)

// Register a successfully instantiated context to the singleton. This should be at the
Expand All @@ -893,6 +896,12 @@ object SparkSession {
}
}

private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = {
conf.get(StaticSQLConf.QUERY_EXECUTION_LISTENERS)
.map(Utils.classForName(_))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: -> .map(Utils.classForName)

.map(_.newInstance().asInstanceOf[QueryExecutionListener])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply throwing ClassNotFoundException might not be good to end users, if we plan to make this SQL configuration external.

Could you use the try and catch to issue a better error message when we are unable to create/initialize the class? Thanks!

}

/**
* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,4 +1047,14 @@ object StaticSQLConf {
"SQL configuration and the current database.")
.booleanConf
.createWithDefault(false)

val QUERY_EXECUTION_LISTENERS = buildConf("spark.sql.queryExecutionListeners")
.doc("A comma-separated list of classes that implement QueryExecutionListener. When creating " +
"a SparkSession, instances of these listeners will be added to it. These classes " +
"needs to have a zero-argument constructor. If the specified class can't be found or" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: needs -> need

" the class specified doesn't have a valid constructor the SparkSession creation " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the class specified -> the specified class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please move the starting space to the end of the last line.

"will fail with an exception.")
.stringConf
.toSequence
.createWithDefault(Nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we should make it internal or external. Let the others decide it. Either is fine to me.

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,50 @@ trait QueryExecutionListener {
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param durationNs the execution time for this query in nanoseconds.
*
* @note This can be invoked by multiple different threads.
* @param outputParams The output parameters in case the method is invoked as a result of a
* write operation. In case of a read will be @see `None`
*/
@DeveloperApi
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit

def onSuccess(
funcName: String,
qe: QueryExecution,
durationNs: Long,
outputParams: Option[OutputParams]): Unit
/**
* A callback function that will be called when a query execution failed.
*
* @param funcName the name of the action that triggered this query.
* @param qe the QueryExecution object that carries detail information like logical plan,
* physical plan, etc.
* @param exception the exception that failed this query.
* @param outputParams The output parameters in case the method is invoked as a result of a
* write operation. In case of a read will be @see `None`
*
* @note This can be invoked by multiple different threads.
*/
@DeveloperApi
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit
def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception,
outputParams: Option[OutputParams]): Unit
}


/**
* Contains extra information useful for query analysis passed on from the methods in
* @see `org.apache.spark.sql.DataFrameWriter` while writing to a datasource
* @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc.
* @param destination path or table name written to
* @param options the map containing the output options for the underlying datasource
* specified by using the @see `org.apache.spark.sql.DataFrameWriter#option` method
* @param writeParams will contain any extra information that the write method wants to provide
*/
@DeveloperApi
case class OutputParams(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks reasonable to provide more information to the listeners for write operations. However, this will be public, I think we should think about it more carefully to get a better design, can we do it later?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry arguments to this class seem to have been picked pretty randomly. Can you explain more why these parameters are picked?

datasourceType: String,
destination: Option[String],
options: Map[String, String],
writeParams: Map[String, String] = Map.empty)
/**
* :: Experimental ::
*
Expand Down Expand Up @@ -98,18 +121,26 @@ class ExecutionListenerManager private[sql] () extends Logging {
listeners.clear()
}

private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
private[sql] def onSuccess(
funcName: String,
qe: QueryExecution,
duration: Long,
outputParams: Option[OutputParams] = None): Unit = {
readLock {
withErrorHandling { listener =>
listener.onSuccess(funcName, qe, duration)
listener.onSuccess(funcName, qe, duration, outputParams)
}
}
}

private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
private[sql] def onFailure(
funcName: String,
qe: QueryExecution,
exception: Exception,
outputParams: Option[OutputParams] = None): Unit = {
readLock {
withErrorHandling { listener =>
listener.onFailure(funcName, qe, exception)
listener.onFailure(funcName, qe, exception, outputParams)
}
}
}
Expand Down
Loading