-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-18120 ][SQL] Call QueryExecutionListener callback methods for … #16664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
751ded0
b0392ed
752125a
ecf9f34
a0c7c22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,10 +26,13 @@ import org.apache.spark.sql.catalyst.TableIdentifier | |
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} | ||
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} | ||
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable | ||
import org.apache.spark.sql.execution.QueryExecution | ||
import org.apache.spark.sql.execution.command.DDLUtils | ||
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} | ||
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions | ||
import org.apache.spark.sql.sources.BaseRelation | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.util.{OutputParams} | ||
|
||
/** | ||
* Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, | ||
|
@@ -189,6 +192,32 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
this | ||
} | ||
|
||
/** | ||
* Wrap a DataFrameWriter action to track the query execution and time cost, then report to the | ||
* user-registered callback functions. | ||
* | ||
* @param funcName A identifier for the method executing the query | ||
* @param qe the @see `QueryExecution` object associated with the query | ||
* @param outputParams The output parameters useful for query analysis | ||
* @param action the function that executes the query after which the listener methods gets | ||
* called. | ||
*/ | ||
private def withAction( | ||
funcName: String, | ||
qe: QueryExecution, | ||
outputParams: OutputParams)(action: => Unit) = { | ||
try { | ||
val start = System.nanoTime() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
action | ||
val end = System.nanoTime() | ||
df.sparkSession.listenerManager.onSuccess(funcName, qe, end - start, Some(outputParams)) | ||
} catch { | ||
case e: Exception => | ||
df.sparkSession.listenerManager.onFailure(funcName, qe, e, Some(outputParams)) | ||
throw e | ||
} | ||
} | ||
|
||
/** | ||
* Saves the content of the `DataFrame` at the specified path. | ||
* | ||
|
@@ -218,7 +247,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
bucketSpec = getBucketSpec, | ||
options = extraOptions.toMap) | ||
|
||
dataSource.write(mode, df) | ||
val destination = source match { | ||
case "jdbc" => extraOptions.get(JDBCOptions.JDBC_TABLE_NAME) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For JDBC, the source value might not be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please give me some more info. Looking at the DataFrameWriter#jdbc method it sets the source as "jdbc". Are there other places that this source is being set? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, df.write.format("org.apache.spark.sql.jdbc")
.options(Map("url" -> url, "dbtable" -> "TEST.SAVETEST"))
.save() |
||
case _ => extraOptions.get("path") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the external data source connectors, it might not have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes for methods like saveAsTable() there is no path. Do you see a issue here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It sounds like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Being the person who requested this class instead of an opaque map, I think using an opaque map makes for a really bad user API. The listener now needs to know about "magic keys" that have special meaning, which can vary depending on the destination. So you end up making up some contract that certain keys have some special meanings an all sources need to use them that way, so basically you end up encoding this class in a map. That being said I'm not super happy with the way JDBC works, because there's still some information embedded in the map. I thought about it a little but didn't come up with a good solution; embedding the table name in the JDBC URI sounds hacky and brittle. Best one I got is a separate field in this class (e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to make it more general instead of introducing a class for the write path only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
yes, it is. e.g. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually all the "magic keys" in the options used by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's good to know, but they only seem to be, at best, indirectly documented. The
I agree that it needs a careful design and the current one doesn't cover all the options. But this PR is of very marginal value without this information being exposed in some way. If you guys feel strongly that it should be a map and that's it, I guess it will be hard to argue. Then we'll have to do that and document all the keys used internally by Spark and make them public, and promise ourselves that we'll never break them. My belief is that a more structured type would help here. Since the current code is obviously not enough, we could have something that's more future-proof, like:
Then listeners can easily handle future params by matching and handling the generic params. Anyway, my opinion is that a raw map is not a very good API, regardless of API stability; it's hard to use and easy to break. But I'll defer to you guys if you really don't like my suggestions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes those are public APIs. |
||
} | ||
val outputParams = OutputParams(source, destination, extraOptions.toMap) | ||
withAction("save", df.queryExecution, outputParams) { | ||
dataSource.write(mode, df) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -261,13 +297,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
) | ||
} | ||
|
||
df.sparkSession.sessionState.executePlan( | ||
val qe = df.sparkSession.sessionState.executePlan( | ||
InsertIntoTable( | ||
table = UnresolvedRelation(tableIdent), | ||
partition = Map.empty[String, Option[String]], | ||
child = df.logicalPlan, | ||
overwrite = mode == SaveMode.Overwrite, | ||
ifNotExists = false)).toRdd | ||
ifNotExists = false)) | ||
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) | ||
withAction("insertInto", qe, outputParams)(qe.toRdd) | ||
} | ||
|
||
private def normalizedParCols: Option[Seq[String]] = partitioningColumns.map { cols => | ||
|
@@ -324,7 +362,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
|
||
private def assertNotPartitioned(operation: String): Unit = { | ||
if (partitioningColumns.isDefined) { | ||
throw new AnalysisException( s"'$operation' does not support partitioning") | ||
throw new AnalysisException(s"'$operation' does not support partitioning") | ||
} | ||
} | ||
|
||
|
@@ -428,8 +466,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { | |
partitionColumnNames = partitioningColumns.getOrElse(Nil), | ||
bucketSpec = getBucketSpec | ||
) | ||
df.sparkSession.sessionState.executePlan( | ||
CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd | ||
val qe = df.sparkSession.sessionState.executePlan( | ||
CreateTable(tableDesc, mode, Some(df.logicalPlan))) | ||
val outputParams = OutputParams(source, Some(tableIdent.unquotedString), extraOptions.toMap) | ||
withAction("saveAsTable", qe, outputParams)(qe.toRdd) | ||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,12 +40,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} | |
import org.apache.spark.sql.execution._ | ||
import org.apache.spark.sql.execution.datasources.LogicalRelation | ||
import org.apache.spark.sql.execution.ui.SQLListener | ||
import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} | ||
import org.apache.spark.sql.internal._ | ||
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION | ||
import org.apache.spark.sql.sources.BaseRelation | ||
import org.apache.spark.sql.streaming._ | ||
import org.apache.spark.sql.types.{DataType, LongType, StructType} | ||
import org.apache.spark.sql.util.ExecutionListenerManager | ||
import org.apache.spark.sql.util.{ExecutionListenerManager, QueryExecutionListener} | ||
import org.apache.spark.util.Utils | ||
|
||
|
||
|
@@ -876,6 +876,9 @@ object SparkSession { | |
} | ||
session = new SparkSession(sparkContext) | ||
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } | ||
for (qeListener <- createQueryExecutionListeners(session.sparkContext.getConf)) { | ||
session.listenerManager.register(qeListener) | ||
} | ||
defaultSession.set(session) | ||
|
||
// Register a successfully instantiated context to the singleton. This should be at the | ||
|
@@ -893,6 +896,12 @@ object SparkSession { | |
} | ||
} | ||
|
||
private def createQueryExecutionListeners(conf: SparkConf): Seq[QueryExecutionListener] = { | ||
conf.get(StaticSQLConf.QUERY_EXECUTION_LISTENERS) | ||
.map(Utils.classForName(_)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: -> |
||
.map(_.newInstance().asInstanceOf[QueryExecutionListener]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simply throwing Could you use the try and catch to issue a better error message when we are unable to create/initialize the class? Thanks! |
||
} | ||
|
||
/** | ||
* Creates a [[SparkSession.Builder]] for constructing a [[SparkSession]]. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1047,4 +1047,14 @@ object StaticSQLConf { | |
"SQL configuration and the current database.") | ||
.booleanConf | ||
.createWithDefault(false) | ||
|
||
val QUERY_EXECUTION_LISTENERS = buildConf("spark.sql.queryExecutionListeners") | ||
.doc("A comma-separated list of classes that implement QueryExecutionListener. When creating " + | ||
"a SparkSession, instances of these listeners will be added to it. These classes " + | ||
"needs to have a zero-argument constructor. If the specified class can't be found or" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: |
||
" the class specified doesn't have a valid constructor the SparkSession creation " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: please move the starting space to the end of the last line. |
||
"will fail with an exception.") | ||
.stringConf | ||
.toSequence | ||
.createWithDefault(Nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure whether we should make it internal or external. Let the others decide it. Either is fine to me. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -44,27 +44,50 @@ trait QueryExecutionListener { | |
* @param qe the QueryExecution object that carries detail information like logical plan, | ||
* physical plan, etc. | ||
* @param durationNs the execution time for this query in nanoseconds. | ||
* | ||
* @note This can be invoked by multiple different threads. | ||
* @param outputParams The output parameters in case the method is invoked as a result of a | ||
* write operation. In case of a read will be @see `None` | ||
*/ | ||
@DeveloperApi | ||
def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit | ||
|
||
def onSuccess( | ||
funcName: String, | ||
qe: QueryExecution, | ||
durationNs: Long, | ||
outputParams: Option[OutputParams]): Unit | ||
/** | ||
* A callback function that will be called when a query execution failed. | ||
* | ||
* @param funcName the name of the action that triggered this query. | ||
* @param qe the QueryExecution object that carries detail information like logical plan, | ||
* physical plan, etc. | ||
* @param exception the exception that failed this query. | ||
* @param outputParams The output parameters in case the method is invoked as a result of a | ||
* write operation. In case of a read will be @see `None` | ||
* | ||
* @note This can be invoked by multiple different threads. | ||
*/ | ||
@DeveloperApi | ||
def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit | ||
def onFailure( | ||
funcName: String, | ||
qe: QueryExecution, | ||
exception: Exception, | ||
outputParams: Option[OutputParams]): Unit | ||
} | ||
|
||
|
||
/** | ||
* Contains extra information useful for query analysis passed on from the methods in | ||
* @see `org.apache.spark.sql.DataFrameWriter` while writing to a datasource | ||
* @param datasourceType type of data source written to like csv, parquet, json, hive, jdbc etc. | ||
* @param destination path or table name written to | ||
* @param options the map containing the output options for the underlying datasource | ||
* specified by using the @see `org.apache.spark.sql.DataFrameWriter#option` method | ||
* @param writeParams will contain any extra information that the write method wants to provide | ||
*/ | ||
@DeveloperApi | ||
case class OutputParams( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks reasonable to provide more information to the listeners for write operations. However, this will be public, I think we should think about it more carefully to get a better design, can we do it later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry arguments to this class seem to have been picked pretty randomly. Can you explain more why these parameters are picked? |
||
datasourceType: String, | ||
destination: Option[String], | ||
options: Map[String, String], | ||
writeParams: Map[String, String] = Map.empty) | ||
/** | ||
* :: Experimental :: | ||
* | ||
|
@@ -98,18 +121,26 @@ class ExecutionListenerManager private[sql] () extends Logging { | |
listeners.clear() | ||
} | ||
|
||
private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { | ||
private[sql] def onSuccess( | ||
funcName: String, | ||
qe: QueryExecution, | ||
duration: Long, | ||
outputParams: Option[OutputParams] = None): Unit = { | ||
readLock { | ||
withErrorHandling { listener => | ||
listener.onSuccess(funcName, qe, duration) | ||
listener.onSuccess(funcName, qe, duration, outputParams) | ||
} | ||
} | ||
} | ||
|
||
private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { | ||
private[sql] def onFailure( | ||
funcName: String, | ||
qe: QueryExecution, | ||
exception: Exception, | ||
outputParams: Option[OutputParams] = None): Unit = { | ||
readLock { | ||
withErrorHandling { listener => | ||
listener.onFailure(funcName, qe, exception) | ||
listener.onFailure(funcName, qe, exception, outputParams) | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like a completely unrelated change to the bug fix.