Skip to content

Commit

Permalink
lemme try if this can make things faster
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed Oct 11, 2024
1 parent 3bcda6d commit 216b467
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -808,10 +808,8 @@ class SparkSession private(
// active session once we are done.
val old = SparkSession.activeThreadSession.get()
SparkSession.setActiveSession(this)
artifactManager.withResources {
try block finally {
SparkSession.setActiveSession(old)
}
try block finally {
SparkSession.setActiveSession(old)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,93 +120,97 @@ object SQLExecution extends Logging {
val redactedConfigs = sparkSession.sessionState.conf.redactOptions(modifiedConfigs)

withSQLConfPropagated(sparkSession) {
withSessionTagsApplied(sparkSession) {
var ex: Option[Throwable] = None
var isExecutedPlanAvailable = false
val startTime = System.nanoTime()
val startEvent = SparkListenerSQLExecutionStart(
executionId = executionId,
rootExecutionId = Some(rootExecutionId),
description = desc,
details = callSite.longForm,
physicalPlanDescription = "",
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
sparkSession.artifactManager.withResources {
withSessionTagsApplied(sparkSession) {
var ex: Option[Throwable] = None
var isExecutedPlanAvailable = false
val startTime = System.nanoTime()
val startEvent = SparkListenerSQLExecutionStart(
executionId = executionId,
rootExecutionId = Some(rootExecutionId),
description = desc,
details = callSite.longForm,
physicalPlanDescription = "",
sparkPlanInfo = SparkPlanInfo.EMPTY,
time = System.currentTimeMillis(),
modifiedConfigs = redactedConfigs,
jobTags = sc.getJobTags(),
jobGroupId = Option(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
)
try {
body match {
case Left(e) =>
sc.listenerBus.post(startEvent)
throw e
case Right(f) =>
val planDescriptionMode =
ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
val planDesc = queryExecution.explainString(planDescriptionMode)
val planInfo = try {
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
} catch {
case NonFatal(e) =>
logDebug("Failed to generate SparkPlanInfo", e)
// If the queryExecution already failed before this, we are not able to
// generate the the plan info, so we use and empty graphviz node to make the
// UI happy
SparkPlanInfo.EMPTY
}
sc.listenerBus.post(
startEvent.copy(physicalPlanDescription = planDesc, sparkPlanInfo = planInfo))
isExecutedPlanAvailable = true
f()
}
} catch {
case e: Throwable =>
ex = Some(e)
throw e
case Right(f) =>
val planDescriptionMode =
ExplainMode.fromString(sparkSession.sessionState.conf.uiExplainMode)
val planDesc = queryExecution.explainString(planDescriptionMode)
val planInfo = try {
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan)
} catch {
case NonFatal(e) =>
logDebug("Failed to generate SparkPlanInfo", e)
// If the queryExecution already failed before this, we are not able to generate
// the the plan info, so we use and empty graphviz node to make the UI happy
SparkPlanInfo.EMPTY
}
sc.listenerBus.post(
startEvent.copy(physicalPlanDescription = planDesc, sparkPlanInfo = planInfo))
isExecutedPlanAvailable = true
f()
}
} catch {
case e: Throwable =>
ex = Some(e)
throw e
} finally {
val endTime = System.nanoTime()
val errorMessage = ex.map {
case e: SparkThrowable =>
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.PRETTY)
case e =>
Utils.exceptionString(e)
}
if (queryExecution.shuffleCleanupMode != DoNotCleanup
&& isExecutedPlanAvailable) {
val shuffleIds = queryExecution.executedPlan match {
case ae: AdaptiveSparkPlanExec =>
ae.context.shuffleIds.asScala.keys
case _ =>
Iterable.empty
} finally {
val endTime = System.nanoTime()
val errorMessage = ex.map {
case e: SparkThrowable =>
SparkThrowableHelper.getMessage(e, ErrorMessageFormat.PRETTY)
case e =>
Utils.exceptionString(e)
}
shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister
// the shuffle on MapOutputTracker, so that stage retries would be triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)
case SkipMigration =>
SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId)
case _ => // this should not happen
if (queryExecution.shuffleCleanupMode != DoNotCleanup
&& isExecutedPlanAvailable) {
val shuffleIds = queryExecution.executedPlan match {
case ae: AdaptiveSparkPlanExec =>
ae.context.shuffleIds.asScala.keys
case _ =>
Iterable.empty
}
shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not
// unregister the shuffle on MapOutputTracker, so that stage retries would be
// triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)
case SkipMigration =>
SparkEnv.get.blockManager.migratableResolver.addShuffleToSkip(shuffleId)
case _ => // this should not happen
}
}
}
val event = SparkListenerSQLExecutionEnd(
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the
// `name` parameter. The `ExecutionListenerManager` only watches SQL executions with
// name. We can specify the execution name in more places in the future, so that
// `QueryExecutionListener` can track more cases.
event.executionName = name
event.duration = endTime - startTime
event.qe = queryExecution
event.executionFailure = ex
sc.listenerBus.post(event)
}
val event = SparkListenerSQLExecutionEnd(
executionId,
System.currentTimeMillis(),
// Use empty string to indicate no error, as None may mean events generated by old
// versions of Spark.
errorMessage.orElse(Some("")))
// Currently only `Dataset.withAction` and `DataFrameWriter.runCommand` specify the
// `name` parameter. The `ExecutionListenerManager` only watches SQL executions with
// name. We can specify the execution name in more places in the future, so that
// `QueryExecutionListener` can track more cases.
event.executionName = name
event.duration = endTime - startTime
event.qe = queryExecution
event.executionFailure = ex
sc.listenerBus.post(event)
}
}
}
Expand Down

0 comments on commit 216b467

Please sign in to comment.