Skip to content

Commit 0745333

Browse files
chakravarthiTHyukjinKwon
authored andcommitted
[SPARK-27088][SQL] Add a configuration to set log level for each batch at RuleExecutor
## What changes were proposed in this pull request? Similar to #22406 , which has made log level for plan changes by each rule configurable ,this PR is to make log level for plan changes by each batch configurable,and I have reused the same configuration: "spark.sql.optimizer.planChangeLog.level". Config proposed in this PR , spark.sql.optimizer.planChangeLog.batches - enable plan change logging only for a set of specified batches, separated by commas. ## How was this patch tested? Added UT , also tested manually and attached screenshots below. 1)Setting spark.sql.optimizer.planChangeLog.leve to warn. ![settingLogLevelToWarn](https://user-images.githubusercontent.com/45845595/54556730-8803dd00-49df-11e9-95ab-ebb0c8d735ef.png) 2)setting spark.sql.optimizer.planChangeLog.batches to Resolution and Subquery. ![settingBatchestoLog](https://user-images.githubusercontent.com/45845595/54556740-8cc89100-49df-11e9-80ab-fbbbe1ff2cdf.png) 3) plan change logging enabled only for a set of specified batches(Resolution and Subquery) ![batchloggingOp](https://user-images.githubusercontent.com/45845595/54556788-ab2e8c80-49df-11e9-9ae0-57815f552896.png) Closes #24136 from chakravarthiT/logBatches. Lead-authored-by: chakravarthiT <45845595+chakravarthiT@users.noreply.github.com> Co-authored-by: chakravarthiT <tcchakra@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
1 parent 181d190 commit 0745333

File tree

3 files changed

+87
-31
lines changed

3 files changed

+87
-31
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
113113
if (effective) {
114114
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
115115
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
116-
planChangeLogger.log(rule.ruleName, plan, result)
116+
planChangeLogger.logRule(rule.ruleName, plan, result)
117117
}
118118
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
119119
queryExecutionMetrics.incNumExecution(rule.ruleName)
@@ -152,15 +152,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
152152
lastPlan = curPlan
153153
}
154154

155-
if (!batchStartPlan.fastEquals(curPlan)) {
156-
logDebug(
157-
s"""
158-
|=== Result of Batch ${batch.name} ===
159-
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
160-
""".stripMargin)
161-
} else {
162-
logTrace(s"Batch ${batch.name} has no effect.")
163-
}
155+
planChangeLogger.logBatch(batch.name, batchStartPlan, curPlan)
164156
}
165157

166158
curPlan
@@ -172,21 +164,46 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
172164

173165
private val logRules = SQLConf.get.optimizerPlanChangeRules.map(Utils.stringToSeq)
174166

175-
def log(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
167+
private val logBatches = SQLConf.get.optimizerPlanChangeBatches.map(Utils.stringToSeq)
168+
169+
def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
176170
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
177-
lazy val message =
171+
def message(): String = {
178172
s"""
179173
|=== Applying Rule ${ruleName} ===
180174
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
181175
""".stripMargin
182-
logLevel match {
183-
case "TRACE" => logTrace(message)
184-
case "DEBUG" => logDebug(message)
185-
case "INFO" => logInfo(message)
186-
case "WARN" => logWarning(message)
187-
case "ERROR" => logError(message)
188-
case _ => logTrace(message)
189176
}
177+
178+
logBasedOnLevel(message)
179+
}
180+
}
181+
182+
def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
183+
if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
184+
def message(): String = {
185+
if (!oldPlan.fastEquals(newPlan)) {
186+
s"""
187+
|=== Result of Batch ${batchName} ===
188+
|${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
189+
""".stripMargin
190+
} else {
191+
s"Batch ${batchName} has no effect."
192+
}
193+
}
194+
195+
logBasedOnLevel(message)
196+
}
197+
}
198+
199+
private def logBasedOnLevel(f: => String): Unit = {
200+
logLevel match {
201+
case "TRACE" => logTrace(f)
202+
case "DEBUG" => logDebug(f)
203+
case "INFO" => logInfo(f)
204+
case "WARN" => logWarning(f)
205+
case "ERROR" => logError(f)
206+
case _ => logTrace(f)
190207
}
191208
}
192209
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ object SQLConf {
184184
val OPTIMIZER_PLAN_CHANGE_LOG_LEVEL = buildConf("spark.sql.optimizer.planChangeLog.level")
185185
.internal()
186186
.doc("Configures the log level for logging the change from the original plan to the new " +
187-
"plan after a rule is applied. The value can be 'trace', 'debug', 'info', 'warn', or " +
188-
"'error'. The default log level is 'trace'.")
187+
"plan after a rule or batch is applied. The value can be 'trace', 'debug', 'info', " +
188+
"'warn', or 'error'. The default log level is 'trace'.")
189189
.stringConf
190190
.transform(_.toUpperCase(Locale.ROOT))
191191
.checkValue(logLevel => Set("TRACE", "DEBUG", "INFO", "WARN", "ERROR").contains(logLevel),
@@ -195,9 +195,15 @@ object SQLConf {
195195

196196
val OPTIMIZER_PLAN_CHANGE_LOG_RULES = buildConf("spark.sql.optimizer.planChangeLog.rules")
197197
.internal()
198-
.doc("If this configuration is set, the optimizer will only log plan changes caused by " +
199-
"applying the rules specified in this configuration. The value can be a list of rule " +
200-
"names separated by comma.")
198+
.doc("Configures a list of rules to be logged in the optimizer, in which the rules are " +
199+
"specified by their rule names and separated by comma.")
200+
.stringConf
201+
.createOptional
202+
203+
val OPTIMIZER_PLAN_CHANGE_LOG_BATCHES = buildConf("spark.sql.optimizer.planChangeLog.batches")
204+
.internal()
205+
.doc("Configures a list of batches to be logged in the optimizer, in which the batches " +
206+
"are specified by their batch names and separated by comma.")
201207
.stringConf
202208
.createOptional
203209

@@ -1763,6 +1769,8 @@ class SQLConf extends Serializable with Logging {
17631769

17641770
def optimizerPlanChangeRules: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_RULES)
17651771

1772+
def optimizerPlanChangeBatches: Option[String] = getConf(OPTIMIZER_PLAN_CHANGE_LOG_BATCHES)
1773+
17661774
def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)
17671775

17681776
def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,20 @@ import org.apache.spark.sql.internal.SQLConf
3232
class OptimizerLoggingSuite extends PlanTest {
3333

3434
object Optimize extends RuleExecutor[LogicalPlan] {
35-
val batches = Batch("Optimizer Batch", FixedPoint(100),
36-
PushDownPredicate,
37-
ColumnPruning,
38-
CollapseProject) :: Nil
35+
val batches =
36+
Batch("Optimizer Batch", FixedPoint(100),
37+
PushDownPredicate, ColumnPruning, CollapseProject) ::
38+
Batch("Batch Has No Effect", Once,
39+
ColumnPruning) :: Nil
3940
}
4041

4142
class MockAppender extends AppenderSkeleton {
4243
val loggingEvents = new ArrayBuffer[LoggingEvent]()
4344

4445
override def append(loggingEvent: LoggingEvent): Unit = {
45-
if (loggingEvent.getRenderedMessage().contains("Applying Rule")) {
46+
if (loggingEvent.getRenderedMessage().contains("Applying Rule") ||
47+
loggingEvent.getRenderedMessage().contains("Result of Batch") ||
48+
loggingEvent.getRenderedMessage().contains("has no effect")) {
4649
loggingEvents.append(loggingEvent)
4750
}
4851
}
@@ -51,7 +54,18 @@ class OptimizerLoggingSuite extends PlanTest {
5154
override def requiresLayout(): Boolean = false
5255
}
5356

54-
private def verifyLog(expectedLevel: Level, expectedRules: Seq[String]): Unit = {
57+
private def withLogLevelAndAppender(level: Level, appender: Appender)(f: => Unit): Unit = {
58+
val logger = Logger.getLogger(Optimize.getClass.getName.dropRight(1))
59+
val restoreLevel = logger.getLevel
60+
logger.setLevel(level)
61+
logger.addAppender(appender)
62+
try f finally {
63+
logger.setLevel(restoreLevel)
64+
logger.removeAppender(appender)
65+
}
66+
}
67+
68+
private def verifyLog(expectedLevel: Level, expectedRulesOrBatches: Seq[String]): Unit = {
5569
val logAppender = new MockAppender()
5670
withLogAppender(logAppender,
5771
loggerName = Some(Optimize.getClass.getName.dropRight(1)), level = Some(Level.TRACE)) {
@@ -61,7 +75,8 @@ class OptimizerLoggingSuite extends PlanTest {
6175
comparePlans(Optimize.execute(query), expected)
6276
}
6377
val logMessages = logAppender.loggingEvents.map(_.getRenderedMessage)
64-
assert(expectedRules.forall(rule => logMessages.exists(_.contains(rule))))
78+
assert(expectedRulesOrBatches.forall
79+
(ruleOrBatch => logMessages.exists(_.contains(ruleOrBatch))))
6580
assert(logAppender.loggingEvents.forall(_.getLevel == expectedLevel))
6681
}
6782

@@ -135,4 +150,20 @@ class OptimizerLoggingSuite extends PlanTest {
135150
}
136151
}
137152
}
153+
154+
test("test log batches which change the plan") {
155+
withSQLConf(
156+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Optimizer Batch",
157+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
158+
verifyLog(Level.INFO, Seq("Optimizer Batch"))
159+
}
160+
}
161+
162+
test("test log batches which do not change the plan") {
163+
withSQLConf(
164+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_BATCHES.key -> "Batch Has No Effect",
165+
SQLConf.OPTIMIZER_PLAN_CHANGE_LOG_LEVEL.key -> "INFO") {
166+
verifyLog(Level.INFO, Seq("Batch Has No Effect"))
167+
}
168+
}
138169
}

0 commit comments

Comments
 (0)