@@ -79,47 +79,47 @@ class AppStatusStoreSuite extends SparkFunSuite {
79
79
assert(store.count(classOf [CachedQuantile ]) === 2 )
80
80
}
81
81
82
- private def createLiveStore ( inMemoryStore : InMemoryStore ): AppStatusStore = {
82
+ private def createAppStore ( store : KVStore , live : Boolean = false ): AppStatusStore = {
83
83
val conf = new SparkConf ()
84
- val store = new ElementTrackingStore (inMemoryStore, conf)
85
- val listener = new AppStatusListener (store, conf, true , None )
86
- new AppStatusStore (store, listener = Some (listener))
87
- }
88
-
89
- test(" SPARK-28638: only successful tasks have taskSummary when with in memory kvstore" ) {
90
- val store = new InMemoryStore ()
91
- (0 until 5 ).foreach { i => store.write(newTaskData(i, status = " FAILED" )) }
92
- Seq (new AppStatusStore (store), createLiveStore(store)).foreach { appStore =>
93
- val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
94
- assert(summary.size === 0 )
84
+ if (live) {
85
+ AppStatusStore .createLiveStore(conf)
86
+ } else {
87
+ new AppStatusStore (store)
95
88
}
96
89
}
97
90
98
- test(" SPARK-26260: only successful tasks have taskSummary when with disk kvstore (LevelDB) " ) {
91
+ test(" SPARK-26260: task summary should contain only successful tasks' metrics " ) {
99
92
val testDir = Utils .createTempDir()
100
- val diskStore = KVUtils .open(testDir, getClass().getName())
101
-
102
- (0 until 5 ).foreach { i => diskStore.write(newTaskData(i, status = " FAILED" )) }
103
- Seq (new AppStatusStore (diskStore)).foreach { appStore =>
104
- val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
105
- assert(summary.size === 0 )
106
- }
107
- diskStore.close()
108
- Utils .deleteRecursively(testDir)
109
- }
110
-
111
- test(" SPARK-28638: summary should contain successful tasks only when with in memory kvstore" ) {
112
- val store = new InMemoryStore ()
113
-
114
- for (i <- 0 to 5 ) {
115
- if (i % 2 == 1 ) {
116
- store.write(newTaskData(i, status = " FAILED" ))
117
- } else {
118
- store.write(newTaskData(i))
93
+ val diskStore = KVUtils .open(testDir, getClass.getName)
94
+ val inMemoryStore = new InMemoryStore
95
+
96
+ val historyDiskAppStore = createAppStore(diskStore)
97
+ val historyInMemoryAppStore = createAppStore(inMemoryStore)
98
+ val liveAppStore = createAppStore(inMemoryStore, live = true )
99
+
100
+ Seq (historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore =>
101
+ val store = appStore.store
102
+ // Success and failed tasks metrics
103
+ for (i <- 0 to 5 ) {
104
+ if (i % 2 == 1 ) {
105
+ store.write(newTaskData(i, status = " FAILED" ))
106
+ } else {
107
+ store.write(newTaskData(i, status = " SUCCESS" ))
108
+ }
109
+ }
110
+ // Running tasks metrics (default metrics, positive metrics)
111
+ Seq (- 1 , 6 ).foreach { metric =>
112
+ store.write(newTaskData(metric, status = " RUNNING" ))
119
113
}
120
- }
121
114
122
- Seq (new AppStatusStore (store), createLiveStore(store)).foreach { appStore =>
115
+ /**
116
+ * Following are the tasks metrics,
117
+ * 0, 2, 4 => Success
118
+ * 1, 3, 5 => Failed
119
+ * -1, 6 => Running
120
+ *
121
+ * Task summary will consider (0, 2, 4) only
122
+ */
123
123
val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
124
124
125
125
val values = Array (0.0 , 2.0 , 4.0 )
@@ -128,42 +128,28 @@ class AppStatusStoreSuite extends SparkFunSuite {
128
128
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
129
129
assert(expected === actual)
130
130
}
131
+ appStore.close()
131
132
}
133
+ Utils .deleteRecursively(testDir)
132
134
}
133
135
134
-
135
- test(" SPARK-26260: task summary size for default metrics should be zero" ) {
136
- val store = new InMemoryStore ()
137
- store.write(newTaskData(- 1 , status = " RUNNING" ))
138
- Seq (new AppStatusStore (store), createLiveStore(store)).foreach { appStore =>
139
- val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
140
- assert(summary.size === 0 )
141
- }
142
- }
143
-
144
- test(" SPARK-26260: summary should contain successful tasks only when with LevelDB store" ) {
136
+ test(" SPARK-26260: task summary should be empty for non-successful tasks" ) {
137
+ // This test will check for 0 metric value for failed task
145
138
val testDir = Utils .createTempDir()
146
- val diskStore = KVUtils .open(testDir, getClass().getName())
147
-
148
- for (i <- 0 to 5 ) {
149
- if (i % 2 == 1 ) {
150
- diskStore.write(newTaskData(i, status = " FAILED" ))
151
- } else {
152
- diskStore.write(newTaskData(i))
153
- }
154
- }
139
+ val diskStore = KVUtils .open(testDir, getClass.getName)
140
+ val inMemoryStore = new InMemoryStore
155
141
156
- Seq (new AppStatusStore (diskStore)).foreach { appStore =>
157
- val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
158
-
159
- val values = Array (0.0 , 2.0 , 4.0 )
142
+ val historyDiskAppStore = createAppStore(diskStore)
143
+ val historyInMemoryAppStore = createAppStore(inMemoryStore)
144
+ val liveAppStore = createAppStore(inMemoryStore, live = true )
160
145
161
- val dist = new Distribution (values, 0 , values.length).getQuantiles(uiQuantiles.sorted)
162
- dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
163
- assert(expected === actual)
164
- }
146
+ Seq (historyDiskAppStore, historyInMemoryAppStore, liveAppStore).foreach { appStore =>
147
+ val store = appStore.store
148
+ (0 until 5 ).foreach { i => store.write(newTaskData(i, status = " FAILED" )) }
149
+ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles)
150
+ assert(summary.size === 0 )
151
+ appStore.close()
165
152
}
166
- diskStore.close()
167
153
Utils .deleteRecursively(testDir)
168
154
}
169
155
0 commit comments