@@ -19,7 +19,7 @@ package org.apache.spark.sql
1919
2020import scala .collection .mutable .ArrayBuffer
2121
22- import org .apache .spark .{DebugFilesystem , SparkConf }
22+ import org .apache .spark .{CleanSpilledPartitionResult , DebugFilesystem , SparkConf , SpilledPartitionResultCleanupWorker }
2323import org .apache .spark .internal .config ._
2424import org .apache .spark .scheduler .{SimpleRepeatableIterator , SpilledResultIterator }
2525import org .apache .spark .sql .internal .SQLConf
@@ -179,4 +179,91 @@ class SpillDirectResultSuite extends QueryTest with SQLTestUtils with SharedSpar
179179 }
180180 }
181181 }
182+
183+ test(" DirectResult spilled result could be cleaned by context cleaner" ) {
184+ val spilledPartitionResultCleanupWorker = sparkContext.cleaner.get.
185+ getContextCleanupWorker(classOf [CleanSpilledPartitionResult ].getName).
186+ asInstanceOf [SpilledPartitionResultCleanupWorker ]
187+
188+ // val cleanCountBefore = spilledPartitionResultCleanupWorker.cleanCount.get
189+ val deleteCountBefore = spilledPartitionResultCleanupWorker.deleteCount.get
190+ var referenceBufferSizeBefore = 0
191+
192+ withTable(table2) {
193+ newTable(table2, 300 )
194+
195+ val query = s " SELECT * FROM $table2 t2 order by key "
196+ val df = sql(query)
197+ val it = df.collectAsIterator()
198+ assert(it.isInstanceOf [SpilledResultIterator [_, _]])
199+ assert(it.asInstanceOf [SpilledResultIterator [Any , Any ]].rowCount == 300 )
200+
201+ val rs = ArrayBuffer [Row ]()
202+ while (it.hasNext) {
203+ rs.append(it.next())
204+ }
205+ assert(rs.length == 300 )
206+ var i = 0
207+ rs.foreach(row => {
208+ assert(row.getInt(0 ) == i)
209+ i += 1
210+ })
211+
212+ // Skip close iterator
213+ // it.close()
214+
215+ referenceBufferSizeBefore = spilledPartitionResultCleanupWorker.referenceBufferSize()
216+ assert(referenceBufferSizeBefore > 0 )
217+
218+ // Clean manually when ite is still available
219+ System .gc()
220+
221+ var tryCount = 1
222+ while (spilledPartitionResultCleanupWorker.referenceBufferSize() > 0 && tryCount < 100 ) {
223+ spilledPartitionResultCleanupWorker.clean()
224+ tryCount += 1
225+ }
226+
227+ // Check cannot clean
228+ assert(spilledPartitionResultCleanupWorker.referenceBufferSize() > 0 )
229+ }
230+
231+ // Clean manually when ite is not available
232+ System .gc()
233+
234+ var tryCount = 1
235+ while (spilledPartitionResultCleanupWorker.referenceBufferSize() > 0 && tryCount < 100 ) {
236+ spilledPartitionResultCleanupWorker.clean()
237+ tryCount += 1
238+ }
239+
240+ // val cleanCountAfter = spilledPartitionResultCleanupWorker.cleanCount.get
241+ assert(spilledPartitionResultCleanupWorker.referenceBufferSize() == 0 )
242+ // assert( (cleanCountAfter - cleanCountBefore) == referenceBufferSizeBefore)
243+ assert( (spilledPartitionResultCleanupWorker.deleteCount.get - deleteCountBefore) > 0 )
244+ }
245+
246+ test(" spilled result can be cleaned properly when task fail" ) {
247+ val spilledPartitionResultCleanupWorker = sparkContext.cleaner.get.
248+ getContextCleanupWorker(classOf [CleanSpilledPartitionResult ].getName).
249+ asInstanceOf [SpilledPartitionResultCleanupWorker ]
250+ withTable(table2) {
251+ newTable(table2, 30000 )
252+
253+ val query = s " SELECT * FROM $table2 t2 order by key "
254+ val df = sql(query)
255+ intercept[Exception ] {
256+ df.collectAsIterator()
257+ }
258+ System .gc()
259+
260+ var tryCount = 1
261+ while (spilledPartitionResultCleanupWorker.referenceBufferSize() > 0 && tryCount < 100 ) {
262+ spilledPartitionResultCleanupWorker.clean()
263+ tryCount += 1
264+ System .gc()
265+ }
266+ assert(spilledPartitionResultCleanupWorker.referenceBufferSize() == 0 )
267+ }
268+ }
182269}
0 commit comments