Skip to content

Commit b0e4a7c

Browse files
committed
WIP [SPARK-29779][CORE] Compact old event log files and cleanup - no UTs yet
1 parent 888cc46 commit b0e4a7c

File tree

10 files changed

+679
-12
lines changed

10 files changed

+679
-12
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.deploy.history.BasicEventFilterBuilder
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.spark.internal.Logging
23+
import org.apache.spark.scheduler._
24+
25+
class BasicEventFilterBuilder extends SparkListener with EventFilterBuilder {
26+
val liveJobToStages = new mutable.HashMap[Int, Seq[Int]]
27+
val stageToTasks = new mutable.HashMap[Int, mutable.Set[Long]]
28+
29+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
30+
liveJobToStages += jobStart.jobId -> jobStart.stageIds
31+
}
32+
33+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
34+
val stages = liveJobToStages.getOrElse(jobEnd.jobId, Seq.empty[Int])
35+
liveJobToStages -= jobEnd.jobId
36+
stages.foreach { stage => stageToTasks -= stage }
37+
}
38+
39+
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
40+
val curTasks = stageToTasks.getOrElseUpdate(taskStart.stageId,
41+
mutable.HashSet[Long]())
42+
curTasks += taskStart.taskInfo.taskId
43+
}
44+
45+
override def createFilter(): EventFilter = new BasicEventFilter(this)
46+
}
47+
48+
class BasicEventFilter(trackListener: BasicEventFilterBuilder) extends EventFilter with Logging {
49+
50+
private val liveTasks: Set[Long] = trackListener.stageToTasks.values match {
51+
case xs if xs.isEmpty => Set.empty[Long]
52+
case xs => xs.reduce(_ ++ _).toSet
53+
}
54+
55+
if (log.isDebugEnabled) {
56+
logDebug(s"live jobs : ${trackListener.liveJobToStages.keySet}")
57+
logDebug(s"stages in jobs : ${trackListener.liveJobToStages.values.flatten}")
58+
logDebug(s"stages : ${trackListener.stageToTasks.keySet}")
59+
logDebug(s"tasks in stages : ${trackListener.stageToTasks.values.flatten}")
60+
}
61+
62+
override def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = {
63+
Some(trackListener.stageToTasks.contains(event.stageInfo.stageId))
64+
}
65+
66+
override def filterStageSubmitted(event: SparkListenerStageSubmitted): Option[Boolean] = {
67+
Some(trackListener.stageToTasks.contains(event.stageInfo.stageId))
68+
}
69+
70+
override def filterTaskStart(event: SparkListenerTaskStart): Option[Boolean] = {
71+
Some(liveTasks.contains(event.taskInfo.taskId))
72+
}
73+
74+
override def filterTaskGettingResult(event: SparkListenerTaskGettingResult): Option[Boolean] = {
75+
Some(liveTasks.contains(event.taskInfo.taskId))
76+
}
77+
78+
override def filterTaskEnd(event: SparkListenerTaskEnd): Option[Boolean] = {
79+
Some(liveTasks.contains(event.taskInfo.taskId))
80+
}
81+
82+
override def filterJobStart(event: SparkListenerJobStart): Option[Boolean] = {
83+
Some(trackListener.liveJobToStages.contains(event.jobId))
84+
}
85+
86+
override def filterJobEnd(event: SparkListenerJobEnd): Option[Boolean] = {
87+
Some(trackListener.liveJobToStages.contains(event.jobId))
88+
}
89+
90+
override def filterUnpersistRDD(event: SparkListenerUnpersistRDD): Option[Boolean] = {
91+
// FIXME: need to track rdd ids?
92+
None
93+
}
94+
95+
override def filterExecutorMetricsUpdate(
96+
event: SparkListenerExecutorMetricsUpdate): Option[Boolean] = {
97+
// FIXME: need to track live executors?
98+
None
99+
}
100+
101+
override def filterStageExecutorMetrics(
102+
event: SparkListenerStageExecutorMetrics): Option[Boolean] = {
103+
// FIXME: need to track live executors?
104+
// handle this regardless of liveness of stage - good to restore metrics
105+
None
106+
}
107+
108+
override def filterExecutorAdded(event: SparkListenerExecutorAdded): Option[Boolean] = {
109+
// FIXME: need to track live executors?
110+
None
111+
}
112+
113+
override def filterExecutorRemoved(event: SparkListenerExecutorRemoved): Option[Boolean] = {
114+
// FIXME: need to track live executors?
115+
None
116+
}
117+
118+
override def filterExecutorBlacklisted(
119+
event: SparkListenerExecutorBlacklisted): Option[Boolean] = {
120+
// FIXME: need to track live executors?
121+
None
122+
}
123+
124+
override def filterExecutorBlacklistedForStage(
125+
event: SparkListenerExecutorBlacklistedForStage): Option[Boolean] = {
126+
// FIXME: need to track live executors?
127+
Some(trackListener.stageToTasks.contains(event.stageId))
128+
}
129+
130+
override def filterNodeBlacklistedForStage(
131+
event: SparkListenerNodeBlacklistedForStage): Option[Boolean] = {
132+
// FIXME: need to track live executors?
133+
Some(trackListener.stageToTasks.contains(event.stageId))
134+
}
135+
136+
override def filterExecutorUnblacklisted(
137+
event: SparkListenerExecutorUnblacklisted): Option[Boolean] = {
138+
// FIXME: need to track live executors?
139+
None
140+
}
141+
142+
override def filterSpeculativeTaskSubmitted(
143+
event: SparkListenerSpeculativeTaskSubmitted): Option[Boolean] = {
144+
Some(trackListener.stageToTasks.contains(event.stageId))
145+
}
146+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.history
19+
20+
import org.apache.spark.scheduler._
21+
22+
trait EventFilterBuilder extends SparkListenerInterface {
23+
def createFilter(): EventFilter
24+
}
25+
26+
trait EventFilter {
27+
def filterStageCompleted(event: SparkListenerStageCompleted): Option[Boolean] = None
28+
29+
def filterStageSubmitted(event: SparkListenerStageSubmitted): Option[Boolean] = None
30+
31+
def filterTaskStart(event: SparkListenerTaskStart): Option[Boolean] = None
32+
33+
def filterTaskGettingResult(event: SparkListenerTaskGettingResult): Option[Boolean] = None
34+
35+
def filterTaskEnd(event: SparkListenerTaskEnd): Option[Boolean] = None
36+
37+
def filterJobStart(event: SparkListenerJobStart): Option[Boolean] = None
38+
39+
def filterJobEnd(event: SparkListenerJobEnd): Option[Boolean] = None
40+
41+
def filterEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Option[Boolean] = None
42+
43+
def filterBlockManagerAdded(event: SparkListenerBlockManagerAdded): Option[Boolean] = None
44+
45+
def filterBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Option[Boolean] = None
46+
47+
def filterUnpersistRDD(event: SparkListenerUnpersistRDD): Option[Boolean] = None
48+
49+
def filterApplicationStart(event: SparkListenerApplicationStart): Option[Boolean] = None
50+
51+
def filterApplicationEnd(event: SparkListenerApplicationEnd): Option[Boolean] = None
52+
53+
def filterExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Option[Boolean] = None
54+
55+
def filterStageExecutorMetrics(event: SparkListenerStageExecutorMetrics): Option[Boolean] = None
56+
57+
def filterExecutorAdded(event: SparkListenerExecutorAdded): Option[Boolean] = None
58+
59+
def filterExecutorRemoved(event: SparkListenerExecutorRemoved): Option[Boolean] = None
60+
61+
def filterExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Option[Boolean] = None
62+
63+
def filterExecutorBlacklistedForStage(
64+
event: SparkListenerExecutorBlacklistedForStage): Option[Boolean] = None
65+
66+
def filterNodeBlacklistedForStage(
67+
event: SparkListenerNodeBlacklistedForStage): Option[Boolean] = None
68+
69+
def filterExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Option[Boolean] = None
70+
71+
def filterNodeBlacklisted(event: SparkListenerNodeBlacklisted): Option[Boolean] = None
72+
73+
def filterNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Option[Boolean] = None
74+
75+
def filterBlockUpdated(event: SparkListenerBlockUpdated): Option[Boolean] = None
76+
77+
def filterSpeculativeTaskSubmitted(
78+
event: SparkListenerSpeculativeTaskSubmitted): Option[Boolean] = None
79+
80+
def filterOtherEvent(event: SparkListenerEvent): Option[Boolean] = None
81+
}
82+

0 commit comments

Comments
 (0)