Skip to content

Commit 23ce9d7

Browse files
committed
[SPARK-29779][CORE] Compact old event log files and cleanup
1 parent dd217e1 commit 23ce9d7

23 files changed

+2211
-97
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.deploy.history.BasicEventFilterBuilder
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.deploy.history.EventFilter.FilterStatistic
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.scheduler._
25+
26+
/**
27+
* This class tracks both live jobs and live executors, and pass the list to the
28+
* [[BasicEventFilter]] to help BasicEventFilter to reject finished jobs (+ stages/tasks/RDDs)
29+
* and dead executors.
30+
*/
31+
private[spark] class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
32+
private val _liveJobToStages = new mutable.HashMap[Int, Seq[Int]]
33+
private val _stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
34+
private val _stageToRDDs = new mutable.HashMap[Int, Seq[Int]]
35+
private val _liveExecutors = new mutable.HashSet[String]
36+
37+
private var totalJobs: Long = 0L
38+
private var totalStages: Long = 0L
39+
private var totalTasks: Long = 0L
40+
41+
def liveJobToStages: Map[Int, Seq[Int]] = _liveJobToStages.toMap
42+
def stageToTasks: Map[Int, Set[Long]] = _stageToTasks.mapValues(_.toSet).toMap
43+
def stageToRDDs: Map[Int, Seq[Int]] = _stageToRDDs.toMap
44+
def liveExecutors: Set[String] = _liveExecutors.toSet
45+
46+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
47+
totalJobs += 1
48+
totalStages += jobStart.stageIds.length
49+
_liveJobToStages += jobStart.jobId -> jobStart.stageIds
50+
}
51+
52+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
53+
val stages = _liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
54+
_liveJobToStages -= jobEnd.jobId
55+
_stageToTasks --= stages
56+
_stageToRDDs --= stages
57+
}
58+
59+
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
60+
_stageToRDDs.getOrElseUpdate(stageSubmitted.stageInfo.stageId,
61+
stageSubmitted.stageInfo.rddInfos.map(_.id))
62+
}
63+
64+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
65+
totalTasks += 1
66+
val curTasks = _stageToTasks.getOrElseUpdate(taskStart.stageId,
67+
mutable.HashSet[Long]())
68+
curTasks += taskStart.taskInfo.taskId
69+
}
70+
71+
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
72+
_liveExecutors += executorAdded.executorId
73+
}
74+
75+
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
76+
_liveExecutors -= executorRemoved.executorId
77+
}
78+
79+
override def createFilter(): EventFilter = new BasicEventFilter(this)
80+
81+
def statistic(): FilterStatistic = {
82+
FilterStatistic(totalJobs, liveJobToStages.size, totalStages,
83+
liveJobToStages.map(_._2.size).sum, totalTasks, _stageToTasks.map(_._2.size).sum)
84+
}
85+
}
86+
87+
/**
88+
* This class provides the functionality to reject events which are related to the finished
89+
* jobs based on the given information. This class only deals with job related events, and provides
90+
* a PartialFunction which returns false for rejected events for finished jobs, returns true
91+
* otherwise.
92+
*/
93+
private[spark] abstract class JobEventFilter(
94+
stats: Option[FilterStatistic],
95+
jobToStages: Map[Int, Seq[Int]],
96+
stageToTasks: Map[Int, Set[Long]],
97+
stageToRDDs: Map[Int, Seq[Int]]) extends EventFilter with Logging {
98+
99+
private val liveTasks: Set[Long] = stageToTasks.values.flatten.toSet
100+
private val liveRDDs: Set[Int] = stageToRDDs.values.flatten.toSet
101+
102+
logDebug(s"jobs : ${jobToStages.keySet}")
103+
logDebug(s"stages in jobs : ${jobToStages.values.flatten}")
104+
logDebug(s"stages : ${stageToTasks.keySet}")
105+
logDebug(s"tasks in stages : ${stageToTasks.values.flatten}")
106+
logDebug(s"RDDs in stages : ${stageToRDDs.values.flatten}")
107+
108+
override def statistic(): Option[FilterStatistic] = stats
109+
110+
protected val acceptFnForJobEvents: PartialFunction[SparkListenerEvent, Boolean] = {
111+
case e: SparkListenerStageCompleted =>
112+
stageToTasks.contains(e.stageInfo.stageId)
113+
114+
case e: SparkListenerStageSubmitted =>
115+
stageToTasks.contains(e.stageInfo.stageId)
116+
117+
case e: SparkListenerTaskStart =>
118+
liveTasks.contains(e.taskInfo.taskId)
119+
120+
case e: SparkListenerTaskGettingResult =>
121+
liveTasks.contains(e.taskInfo.taskId)
122+
123+
case e: SparkListenerTaskEnd =>
124+
liveTasks.contains(e.taskInfo.taskId)
125+
126+
case e: SparkListenerJobStart =>
127+
jobToStages.contains(e.jobId)
128+
129+
case e: SparkListenerJobEnd =>
130+
jobToStages.contains(e.jobId)
131+
132+
case e: SparkListenerUnpersistRDD =>
133+
liveRDDs.contains(e.rddId)
134+
135+
case e: SparkListenerExecutorMetricsUpdate =>
136+
e.accumUpdates.exists { case (_, stageId, _, _) =>
137+
stageToTasks.contains(stageId)
138+
}
139+
140+
case e: SparkListenerSpeculativeTaskSubmitted =>
141+
stageToTasks.contains(e.stageId)
142+
}
143+
}
144+
145+
/**
146+
* This class rejects events which are related to the finished jobs or dead executors,
147+
* based on the given information. The events which are not related to the job and executor
148+
* will be considered as "Don't mind".
149+
*/
150+
private[spark] class BasicEventFilter(
151+
_stats: FilterStatistic,
152+
_liveJobToStages: Map[Int, Seq[Int]],
153+
_stageToTasks: Map[Int, Set[Long]],
154+
_stageToRDDs: Map[Int, Seq[Int]],
155+
liveExecutors: Set[String])
156+
extends JobEventFilter(Some(_stats), _liveJobToStages, _stageToTasks, _stageToRDDs) with Logging {
157+
158+
def this(builder: BasicEventFilterBuilder) = {
159+
this(builder.statistic(), builder.liveJobToStages, builder.stageToTasks, builder.stageToRDDs,
160+
builder.liveExecutors)
161+
}
162+
163+
logDebug(s"live executors : $liveExecutors")
164+
165+
private val _acceptFn: PartialFunction[SparkListenerEvent, Boolean] = {
166+
case e: SparkListenerExecutorAdded => liveExecutors.contains(e.executorId)
167+
case e: SparkListenerExecutorRemoved => liveExecutors.contains(e.executorId)
168+
case e: SparkListenerExecutorBlacklisted => liveExecutors.contains(e.executorId)
169+
case e: SparkListenerExecutorUnblacklisted => liveExecutors.contains(e.executorId)
170+
case e: SparkListenerStageExecutorMetrics => liveExecutors.contains(e.execId)
171+
}
172+
173+
override def acceptFn(): PartialFunction[SparkListenerEvent, Boolean] = {
174+
_acceptFn.orElse(acceptFnForJobEvents)
175+
}
176+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import java.util.ServiceLoader
21+
22+
import scala.io.{Codec, Source}
23+
import scala.util.control.NonFatal
24+
25+
import org.apache.hadoop.fs.{FileSystem, Path}
26+
import org.json4s.jackson.JsonMethods.parse
27+
28+
import org.apache.spark.deploy.history.EventFilter.FilterStatistic
29+
import org.apache.spark.internal.Logging
30+
import org.apache.spark.scheduler._
31+
import org.apache.spark.util.{JsonProtocol, Utils}
32+
33+
/**
34+
* EventFilterBuilder provides the interface to gather the information from events being received
35+
* by [[SparkListenerInterface]], and create a new [[EventFilter]] instance which leverages
36+
* information gathered to decide whether the event should be accepted or not.
37+
*/
38+
private[spark] trait EventFilterBuilder extends SparkListenerInterface {
39+
def createFilter(): EventFilter
40+
}
41+
42+
/** [[EventFilter]] decides whether the given event should be accepted or rejected. */
43+
private[spark] trait EventFilter {
44+
/**
45+
* Provide statistic information of event filter, which would be used for measuring the score
46+
* of compaction.
47+
*
48+
* To simplify the condition, currently the fields of statistic are static, since major kinds of
49+
* events compaction would filter out are job related event types. If the filter doesn't track
50+
* with job related events, return None instead.
51+
*/
52+
def statistic(): Option[FilterStatistic]
53+
54+
/**
55+
* Classify whether the event is accepted or rejected by this filter.
56+
*
57+
* The method should return the partial function which matches the events where the filter can
58+
* decide whether the event should be accepted or rejected. Otherwise it should leave the events
59+
* be unmatched.
60+
*/
61+
def acceptFn(): PartialFunction[SparkListenerEvent, Boolean]
62+
}
63+
64+
object EventFilter extends Logging {
65+
case class FilterStatistic(
66+
totalJobs: Long,
67+
liveJobs: Long,
68+
totalStages: Long,
69+
liveStages: Long,
70+
totalTasks: Long,
71+
liveTasks: Long)
72+
73+
def applyFilterToFile(
74+
fs: FileSystem,
75+
filters: Seq[EventFilter],
76+
path: Path,
77+
onAccepted: (String, SparkListenerEvent) => Unit,
78+
onRejected: (String, SparkListenerEvent) => Unit,
79+
onUnidentified: String => Unit): Unit = {
80+
Utils.tryWithResource(EventLogFileReader.openEventLog(path, fs)) { in =>
81+
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines()
82+
83+
lines.zipWithIndex.foreach { case (line, lineNum) =>
84+
try {
85+
val event = try {
86+
Some(JsonProtocol.sparkEventFromJson(parse(line)))
87+
} catch {
88+
// ignore any exception occurred from unidentified json
89+
case NonFatal(_) =>
90+
onUnidentified(line)
91+
None
92+
}
93+
94+
event.foreach { e =>
95+
val results = filters.flatMap(_.acceptFn().lift.apply(e))
96+
if (results.isEmpty || !results.contains(false)) {
97+
onAccepted(line, e)
98+
} else {
99+
onRejected(line, e)
100+
}
101+
}
102+
} catch {
103+
case e: Exception =>
104+
logError(s"Exception parsing Spark event log: ${path.getName}", e)
105+
logError(s"Malformed line #$lineNum: $line\n")
106+
throw e
107+
}
108+
}
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)